3 from itertools import count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from twisted.internet.interfaces import IPushProducer, IConsumer
8 from foolscap.api import eventually, fireEventually, DeadReferenceError, \
11 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
12 DownloadStopped, MDMF_VERSION, SDMF_VERSION
13 from allmydata.util.assertutil import _assert
14 from allmydata.util import hashutil, log, mathutil, deferredutil
15 from allmydata.util.dictutil import DictOfSets
16 from allmydata import hashtree, codec
17 from allmydata.storage.server import si_b2a
18 from pycryptopp.cipher.aes import AES
19 from pycryptopp.publickey import rsa
21 from allmydata.mutable.common import CorruptShareError, BadShareError, \
22 UncoordinatedWriteError
23 from allmydata.mutable.layout import MDMFSlotReadProxy
26 implements(IRetrieveStatus)
27 statusid_counter = count(0)
30 self.timings["fetch_per_server"] = {}
31 self.timings["decode"] = 0.0
32 self.timings["decrypt"] = 0.0
33 self.timings["cumulative_verify"] = 0.0
36 self.storage_index = None
38 self.encoding = ("?","?")
40 self.status = "Not started"
42 self.counter = self.statusid_counter.next()
43 self.started = time.time()
45 def get_started(self):
47 def get_storage_index(self):
48 return self.storage_index
49 def get_encoding(self):
51 def using_helper(self):
57 def get_progress(self):
61 def get_counter(self):
63 def get_problems(self):
66 def add_fetch_timing(self, server, elapsed):
67 if server not in self.timings["fetch_per_server"]:
68 self.timings["fetch_per_server"][server] = []
69 self.timings["fetch_per_server"][server].append(elapsed)
70 def accumulate_decode_time(self, elapsed):
71 self.timings["decode"] += elapsed
72 def accumulate_decrypt_time(self, elapsed):
73 self.timings["decrypt"] += elapsed
74 def set_storage_index(self, si):
75 self.storage_index = si
76 def set_helper(self, helper):
78 def set_encoding(self, k, n):
79 self.encoding = (k, n)
80 def set_size(self, size):
82 def set_status(self, status):
84 def set_progress(self, value):
86 def set_active(self, value):
88 def add_problem(self, server, f):
89 serverid = server.get_serverid()
90 self._problems[serverid] = f
96 # this class is currently single-use. Eventually (in MDMF) we will make
97 # it multi-use, in which case you can call download(range) multiple
98 # times, and each will have a separate response chain. However the
99 # Retrieve object will remain tied to a specific version of the file, and
100 # will use a single ServerMap instance.
101 implements(IPushProducer)
103 def __init__(self, filenode, storage_broker, servermap, verinfo,
104 fetch_privkey=False, verify=False):
105 self._node = filenode
106 assert self._node.get_pubkey()
107 self._storage_broker = storage_broker
108 self._storage_index = filenode.get_storage_index()
109 assert self._node.get_readkey()
110 self._last_failure = None
111 prefix = si_b2a(self._storage_index)[:5]
112 self._log_number = log.msg("Retrieve(%s): starting" % prefix)
114 self._decoding = False
115 self._bad_shares = set()
117 self.servermap = servermap
118 assert self._node.get_pubkey()
119 self.verinfo = verinfo
120 # during repair, we may be called upon to grab the private key, since
121 # it wasn't picked up during a verify=False checker run, and we'll
122 # need it for repair to generate a new version.
123 self._need_privkey = verify or (fetch_privkey
124 and not self._node.get_privkey())
126 if self._need_privkey:
127 # TODO: Evaluate the need for this. We'll use it if we want
128 # to limit how many queries are on the wire for the privkey
130 self._privkey_query_markers = [] # one Marker for each time we've
131 # tried to get the privkey.
133 # verify means that we are using the downloader logic to verify all
134 # of our shares. This tells the downloader a few things.
136 # 1. We need to download all of the shares.
137 # 2. We don't need to decode or decrypt the shares, since our
138 # caller doesn't care about the plaintext, only the
139 # information about which shares are or are not valid.
140 # 3. When we are validating readers, we need to validate the
141 # signature on the prefix. Do we? We already do this in the
143 self._verify = verify
145 self._status = RetrieveStatus()
146 self._status.set_storage_index(self._storage_index)
147 self._status.set_helper(False)
148 self._status.set_progress(0.0)
149 self._status.set_active(True)
150 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
151 offsets_tuple) = self.verinfo
152 self._status.set_size(datalength)
153 self._status.set_encoding(k, N)
155 self._stopped = False
156 self._pause_deferred = None
158 self._read_length = None
159 self.log("got seqnum %d" % self.verinfo[0])
162 def get_status(self):
165 def log(self, *args, **kwargs):
166 if "parent" not in kwargs:
167 kwargs["parent"] = self._log_number
168 if "facility" not in kwargs:
169 kwargs["facility"] = "tahoe.mutable.retrieve"
170 return log.msg(*args, **kwargs)
172 def _set_current_status(self, state):
173 seg = "%d/%d" % (self._current_segment, self._last_segment)
174 self._status.set_status("segment %s (%s)" % (seg, state))
179 def pauseProducing(self):
181 I am called by my download target if we have produced too much
182 data for it to handle. I make the downloader stop producing new
183 data until my resumeProducing method is called.
185 if self._pause_deferred is not None:
188 # fired when the download is unpaused.
189 self._old_status = self._status.get_status()
190 self._set_current_status("paused")
192 self._pause_deferred = defer.Deferred()
195 def resumeProducing(self):
197 I am called by my download target once it is ready to begin
198 receiving data again.
200 if self._pause_deferred is None:
203 p = self._pause_deferred
204 self._pause_deferred = None
205 self._status.set_status(self._old_status)
207 eventually(p.callback, None)
209 def stopProducing(self):
211 self.resumeProducing()
214 def _check_for_paused(self, res):
216 I am called just before a write to the consumer. I return a
217 Deferred that eventually fires with the data that is to be
218 written to the consumer. If the download has not been paused,
219 the Deferred fires immediately. Otherwise, the Deferred fires
220 when the downloader is unpaused.
222 if self._pause_deferred is not None:
224 self._pause_deferred.addCallback(lambda ignored: d.callback(res))
228 def _check_for_stopped(self, res):
230 raise DownloadStopped("our Consumer called stopProducing()")
234 def download(self, consumer=None, offset=0, size=None):
235 assert IConsumer.providedBy(consumer) or self._verify
238 self._consumer = consumer
239 # we provide IPushProducer, so streaming=True, per
241 self._consumer.registerProducer(self, streaming=True)
243 self._done_deferred = defer.Deferred()
244 self._offset = offset
245 self._read_length = size
246 self._setup_encoding_parameters()
247 self._setup_download()
248 self.log("starting download")
249 self._started_fetching = time.time()
250 # The download process beyond this is a state machine.
251 # _add_active_servers will select the servers that we want to use
252 # for the download, and then attempt to start downloading. After
253 # each segment, it will check for doneness, reacting to broken
254 # servers and corrupt shares as necessary. If it runs out of good
255 # servers before downloading all of the segments, _done_deferred
256 # will errback. Otherwise, it will eventually callback with the
257 # contents of the mutable file.
259 return self._done_deferred
262 d = fireEventually(None) # avoid #237 recursion limit problem
263 d.addCallback(lambda ign: self._activate_enough_servers())
264 d.addCallback(lambda ign: self._download_current_segment())
265 # when we're done, _download_current_segment will call _done. If we
266 # aren't, it will call loop() again.
267 d.addErrback(self._error)
269 def _setup_download(self):
270 self._started = time.time()
271 self._status.set_status("Retrieving Shares")
273 # how many shares do we need?
282 offsets_tuple) = self.verinfo
284 # first, which servers can we use?
285 versionmap = self.servermap.make_versionmap()
286 shares = versionmap[self.verinfo]
287 # this sharemap is consumed as we decide to send requests
288 self.remaining_sharemap = DictOfSets()
289 for (shnum, server, timestamp) in shares:
290 self.remaining_sharemap.add(shnum, server)
291 # Reuse the SlotReader from the servermap.
292 key = (self.verinfo, server.get_serverid(),
293 self._storage_index, shnum)
294 if key in self.servermap.proxies:
295 reader = self.servermap.proxies[key]
297 reader = MDMFSlotReadProxy(server.get_rref(),
298 self._storage_index, shnum, None)
299 reader.server = server
300 self.readers[shnum] = reader
302 if len(self.remaining_sharemap) < k:
303 self._raise_notenoughshareserror()
305 self.shares = {} # maps shnum to validated blocks
306 self._active_readers = [] # list of active readers for this dl.
307 self._block_hash_trees = {} # shnum => hashtree
309 for i in xrange(self._total_shares):
310 # So we don't have to do this later.
311 self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
313 # We need one share hash tree for the entire file; its leaves
314 # are the roots of the block hash trees for the shares that
315 # comprise it, and its root is in the verinfo.
316 self.share_hash_tree = hashtree.IncompleteHashTree(N)
317 self.share_hash_tree.set_hashes({0: root_hash})
319 def decode(self, blocks_and_salts, segnum):
321 I am a helper method that the mutable file update process uses
322 as a shortcut to decode and decrypt the segments that it needs
323 to fetch in order to perform a file update. I take in a
324 collection of blocks and salts, and pick some of those to make a
325 segment with. I return the plaintext associated with that
328 # We don't need the block hash trees in this case.
329 self._block_hash_trees = None
330 self._setup_encoding_parameters()
332 # _decode_blocks() expects the output of a gatherResults that
333 # contains the outputs of _validate_block() (each of which is a dict
334 # mapping shnum to (block,salt) bytestrings).
335 d = self._decode_blocks([blocks_and_salts], segnum)
336 d.addCallback(self._decrypt_segment)
340 def _setup_encoding_parameters(self):
342 I set up the encoding parameters, including k, n, the number
343 of segments associated with this file, and the segment decoders.
353 offsets_tuple) = self.verinfo
354 self._required_shares = k
355 self._total_shares = n
356 self._segment_size = segsize
357 self._data_length = datalength
360 self._version = MDMF_VERSION
362 self._version = SDMF_VERSION
364 if datalength and segsize:
365 self._num_segments = mathutil.div_ceil(datalength, segsize)
366 self._tail_data_size = datalength % segsize
368 self._num_segments = 0
369 self._tail_data_size = 0
371 self._segment_decoder = codec.CRSDecoder()
372 self._segment_decoder.set_params(segsize, k, n)
374 if not self._tail_data_size:
375 self._tail_data_size = segsize
377 self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
378 self._required_shares)
379 if self._tail_segment_size == self._segment_size:
380 self._tail_decoder = self._segment_decoder
382 self._tail_decoder = codec.CRSDecoder()
383 self._tail_decoder.set_params(self._tail_segment_size,
384 self._required_shares,
387 self.log("got encoding parameters: "
390 "%d segments of %d bytes each (%d byte tail segment)" % \
391 (k, n, self._num_segments, self._segment_size,
392 self._tail_segment_size))
394 # Our last task is to tell the downloader where to start and
395 # where to stop. We use three parameters for that:
396 # - self._start_segment: the segment that we need to start
398 # - self._current_segment: the next segment that we need to
400 # - self._last_segment: The last segment that we were asked to
403 # We say that the download is complete when
404 # self._current_segment > self._last_segment. We use
405 # self._start_segment and self._last_segment to know when to
406 # strip things off of segments, and how much to strip.
408 self.log("got offset: %d" % self._offset)
409 # our start segment is the first segment containing the
410 # offset we were given.
411 start = self._offset // self._segment_size
413 _assert(start < self._num_segments,
414 start=start, num_segments=self._num_segments,
415 offset=self._offset, segment_size=self._segment_size)
416 self._start_segment = start
417 self.log("got start segment: %d" % self._start_segment)
419 self._start_segment = 0
422 # If self._read_length is None, then we want to read the whole
423 # file. Otherwise, we want to read only part of the file, and
424 # need to figure out where to stop reading.
425 if self._read_length is not None:
426 # our end segment is the last segment containing part of the
427 # segment that we were asked to read.
428 self.log("got read length %d" % self._read_length)
429 if self._read_length != 0:
430 end_data = self._offset + self._read_length
432 # We don't actually need to read the byte at end_data,
433 # but the one before it.
434 end = (end_data - 1) // self._segment_size
436 _assert(end < self._num_segments,
437 end=end, num_segments=self._num_segments,
438 end_data=end_data, offset=self._offset, read_length=self._read_length,
439 segment_size=self._segment_size)
440 self._last_segment = end
442 self._last_segment = self._start_segment
443 self.log("got end segment: %d" % self._last_segment)
445 self._last_segment = self._num_segments - 1
447 self._current_segment = self._start_segment
449 def _activate_enough_servers(self):
451 I populate self._active_readers with enough active readers to
452 retrieve the contents of this mutable file. I am called before
453 downloading starts, and (eventually) after each validation
454 error, connection error, or other problem in the download.
456 # TODO: It would be cool to investigate other heuristics for
457 # reader selection. For instance, the cost (in time the user
458 # spends waiting for their file) of selecting a really slow server
459 # that happens to have a primary share is probably more than
460 # selecting a really fast server that doesn't have a primary
461 # share. Maybe the servermap could be extended to provide this
462 # information; it could keep track of latency information while
463 # it gathers more important data, and then this routine could
464 # use that to select active readers.
466 # (these and other questions would be easier to answer with a
467 # robust, configurable tahoe-lafs simulator, which modeled node
468 # failures, differences in node speed, and other characteristics
469 # that we expect storage servers to have. You could have
470 # presets for really stable grids (like allmydata.com),
471 # friendnets, make it easy to configure your own settings, and
472 # then simulate the effect of big changes on these use cases
473 # instead of just reasoning about what the effect might be. Out
474 # of scope for MDMF, though.)
476 # XXX: Why don't format= log messages work here?
478 known_shnums = set(self.remaining_sharemap.keys())
479 used_shnums = set([r.shnum for r in self._active_readers])
480 unused_shnums = known_shnums - used_shnums
483 new_shnums = unused_shnums # use them all
484 elif len(self._active_readers) < self._required_shares:
486 more = self._required_shares - len(self._active_readers)
487 # We favor lower numbered shares, since FEC is faster with
488 # primary shares than with other shares, and lower-numbered
489 # shares are more likely to be primary than higher numbered
491 new_shnums = sorted(unused_shnums)[:more]
492 if len(new_shnums) < more:
493 # We don't have enough readers to retrieve the file; fail.
494 self._raise_notenoughshareserror()
498 self.log("adding %d new servers to the active list" % len(new_shnums))
499 for shnum in new_shnums:
500 reader = self.readers[shnum]
501 self._active_readers.append(reader)
502 self.log("added reader for share %d" % shnum)
503 # Each time we add a reader, we check to see if we need the
504 # private key. If we do, we politely ask for it and then continue
505 # computing. If we find that we haven't gotten it at the end of
506 # segment decoding, then we'll take more drastic measures.
507 if self._need_privkey and not self._node.is_readonly():
508 d = reader.get_encprivkey()
509 d.addCallback(self._try_to_validate_privkey, reader, reader.server)
510 # XXX: don't just drop the Deferred. We need error-reporting
511 # but not flow-control here.
513 def _try_to_validate_prefix(self, prefix, reader):
515 I check that the prefix returned by a candidate server for
516 retrieval matches the prefix that the servermap knows about
517 (and, hence, the prefix that was validated earlier). If it does,
518 I return True, which means that I approve of the use of the
519 candidate server for segment retrieval. If it doesn't, I return
520 False, which means that another server must be chosen.
530 offsets_tuple) = self.verinfo
531 if known_prefix != prefix:
532 self.log("prefix from share %d doesn't match" % reader.shnum)
533 raise UncoordinatedWriteError("Mismatched prefix -- this could "
534 "indicate an uncoordinated write")
535 # Otherwise, we're okay -- no issues.
537 def _mark_bad_share(self, server, shnum, reader, f):
539 I mark the given (server, shnum) as a bad share, which means that it
540 will not be used anywhere else.
542 There are several reasons to want to mark something as a bad
543 share. These include:
545 - A connection error to the server.
546 - A mismatched prefix (that is, a prefix that does not match
547 our local conception of the version information string).
548 - A failing block hash, salt hash, share hash, or other
551 This method will ensure that readers that we wish to mark bad
552 (for these reasons or other reasons) are not used for the rest
553 of the download. Additionally, it will attempt to tell the
554 remote server (with no guarantee of success) that its share is
557 self.log("marking share %d on server %s as bad" % \
558 (shnum, server.get_name()))
559 prefix = self.verinfo[-2]
560 self.servermap.mark_bad_share(server, shnum, prefix)
561 self._bad_shares.add((server, shnum, f))
562 self._status.add_problem(server, f)
563 self._last_failure = f
565 # Remove the reader from _active_readers
566 self._active_readers.remove(reader)
567 for shnum in list(self.remaining_sharemap.keys()):
568 self.remaining_sharemap.discard(shnum, reader.server)
570 if f.check(BadShareError):
571 self.notify_server_corruption(server, shnum, str(f.value))
573 def _download_current_segment(self):
575 I download, validate, decode, decrypt, and assemble the segment
576 that this Retrieve is currently responsible for downloading.
578 if self._current_segment > self._last_segment:
579 # No more segments to download, we're done.
580 self.log("got plaintext, done")
582 elif self._verify and len(self._active_readers) == 0:
583 self.log("no more good shares, no need to keep verifying")
585 self.log("on segment %d of %d" %
586 (self._current_segment + 1, self._num_segments))
587 d = self._process_segment(self._current_segment)
588 d.addCallback(lambda ign: self.loop())
591 def _process_segment(self, segnum):
593 I download, validate, decode, and decrypt one segment of the
594 file that this Retrieve is retrieving. This means coordinating
595 the process of getting k blocks of that file, validating them,
596 assembling them into one segment with the decoder, and then
599 self.log("processing segment %d" % segnum)
601 # TODO: The old code uses a marker. Should this code do that
602 # too? What did the Marker do?
604 # We need to ask each of our active readers for its block and
605 # salt. We will then validate those. If validation is
606 # successful, we will assemble the results into plaintext.
608 for reader in self._active_readers:
609 started = time.time()
610 d1 = reader.get_block_and_salt(segnum)
611 d2,d3 = self._get_needed_hashes(reader, segnum)
612 d = deferredutil.gatherResults([d1,d2,d3])
613 d.addCallback(self._validate_block, segnum, reader, reader.server, started)
614 # _handle_bad_share takes care of recoverable errors (by dropping
615 # that share and returning None). Any other errors (i.e. code
616 # bugs) are passed through and cause the retrieve to fail.
617 d.addErrback(self._handle_bad_share, [reader])
619 dl = deferredutil.gatherResults(ds)
621 dl.addCallback(lambda ignored: "")
622 dl.addCallback(self._set_segment)
624 dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
628 def _maybe_decode_and_decrypt_segment(self, results, segnum):
630 I take the results of fetching and validating the blocks from
631 _process_segment. If validation and fetching succeeded without
632 incident, I will proceed with decoding and decryption. Otherwise, I
635 self.log("trying to decode and decrypt segment %d" % segnum)
637 # 'results' is the output of a gatherResults set up in
638 # _process_segment(). Each component Deferred will either contain the
639 # non-Failure output of _validate_block() for a single block (i.e.
640 # {segnum:(block,salt)}), or None if _validate_block threw an
641 # exception and _validation_or_decoding_failed handled it (by
642 # dropping that server).
645 self.log("some validation operations failed; not proceeding")
646 return defer.succeed(None)
647 self.log("everything looks ok, building segment %d" % segnum)
648 d = self._decode_blocks(results, segnum)
649 d.addCallback(self._decrypt_segment)
650 # check to see whether we've been paused before writing
652 d.addCallback(self._check_for_paused)
653 d.addCallback(self._check_for_stopped)
654 d.addCallback(self._set_segment)
658 def _set_segment(self, segment):
660 Given a plaintext segment, I register that segment with the
661 target that is handling the file download.
663 self.log("got plaintext for segment %d" % self._current_segment)
664 if self._current_segment == self._start_segment:
665 # We're on the first segment. It's possible that we want
666 # only some part of the end of this segment, and that we
667 # just downloaded the whole thing to get that part. If so,
668 # we need to account for that and give the reader just the
669 # data that they want.
670 n = self._offset % self._segment_size
671 self.log("stripping %d bytes off of the first segment" % n)
672 self.log("original segment length: %d" % len(segment))
673 segment = segment[n:]
674 self.log("new segment length: %d" % len(segment))
676 if self._current_segment == self._last_segment and self._read_length is not None:
677 # We're on the last segment. It's possible that we only want
678 # part of the beginning of this segment, and that we
679 # downloaded the whole thing anyway. Make sure to give the
680 # caller only the portion of the segment that they want to
682 extra = self._read_length
683 if self._start_segment != self._last_segment:
684 extra -= self._segment_size - \
685 (self._offset % self._segment_size)
686 extra %= self._segment_size
687 self.log("original segment length: %d" % len(segment))
688 segment = segment[:extra]
689 self.log("new segment length: %d" % len(segment))
690 self.log("only taking %d bytes of the last segment" % extra)
693 self._consumer.write(segment)
695 # we don't care about the plaintext if we are doing a verify.
697 self._current_segment += 1
700 def _handle_bad_share(self, f, readers):
702 I am called when a block or a salt fails to correctly validate, or when
703 the decryption or decoding operation fails for some reason. I react to
704 this failure by notifying the remote server of corruption, and then
705 removing the remote server from further activity.
707 # these are the errors we can tolerate: by giving up on this share
708 # and finding others to replace it. Any other errors (i.e. coding
709 # bugs) are re-raised, causing the download to fail.
710 f.trap(DeadReferenceError, RemoteException, BadShareError)
712 # DeadReferenceError happens when we try to fetch data from a server
713 # that has gone away. RemoteException happens if the server had an
714 # internal error. BadShareError encompasses: (UnknownVersionError,
715 # LayoutInvalid, struct.error) which happen when we get obviously
716 # wrong data, and CorruptShareError which happens later, when we
717 # perform integrity checks on the data.
719 assert isinstance(readers, list)
720 bad_shnums = [reader.shnum for reader in readers]
722 self.log("validation or decoding failed on share(s) %s, server(s) %s "
723 ", segment %d: %s" % \
724 (bad_shnums, readers, self._current_segment, str(f)))
725 for reader in readers:
726 self._mark_bad_share(reader.server, reader.shnum, reader, f)
730 def _validate_block(self, results, segnum, reader, server, started):
732 I validate a block from one share on a remote server.
734 # Grab the part of the block hash tree that is necessary to
735 # validate this block, then generate the block hash root.
736 self.log("validating share %d for segment %d" % (reader.shnum,
738 elapsed = time.time() - started
739 self._status.add_fetch_timing(server, elapsed)
740 self._set_current_status("validating blocks")
742 block_and_salt, blockhashes, sharehashes = results
743 block, salt = block_and_salt
744 assert type(block) is str, (block, salt)
746 blockhashes = dict(enumerate(blockhashes))
747 self.log("the reader gave me the following blockhashes: %s" % \
749 self.log("the reader gave me the following sharehashes: %s" % \
751 bht = self._block_hash_trees[reader.shnum]
753 if bht.needed_hashes(segnum, include_leaf=True):
755 bht.set_hashes(blockhashes)
756 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
758 raise CorruptShareError(server,
760 "block hash tree failure: %s" % e)
762 if self._version == MDMF_VERSION:
763 blockhash = hashutil.block_hash(salt + block)
765 blockhash = hashutil.block_hash(block)
766 # If this works without an error, then validation is
769 bht.set_hashes(leaves={segnum: blockhash})
770 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
772 raise CorruptShareError(server,
774 "block hash tree failure: %s" % e)
776 # Reaching this point means that we know that this segment
777 # is correct. Now we need to check to see whether the share
778 # hash chain is also correct.
779 # SDMF wrote share hash chains that didn't contain the
780 # leaves, which would be produced from the block hash tree.
781 # So we need to validate the block hash tree first. If
782 # successful, then bht[0] will contain the root for the
783 # shnum, which will be a leaf in the share hash tree, which
784 # will allow us to validate the rest of the tree.
786 self.share_hash_tree.set_hashes(hashes=sharehashes,
787 leaves={reader.shnum: bht[0]})
788 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
790 raise CorruptShareError(server,
792 "corrupt hashes: %s" % e)
794 self.log('share %d is valid for segment %d' % (reader.shnum,
796 return {reader.shnum: (block, salt)}
799 def _get_needed_hashes(self, reader, segnum):
801 I get the hashes needed to validate segnum from the reader, then return
802 to my caller when this is done.
804 bht = self._block_hash_trees[reader.shnum]
805 needed = bht.needed_hashes(segnum, include_leaf=True)
806 # The root of the block hash tree is also a leaf in the share
807 # hash tree. So we don't need to fetch it from the remote
808 # server. In the case of files with one segment, this means that
809 # we won't fetch any block hash tree from the remote server,
810 # since the hash of each share of the file is the entire block
811 # hash tree, and is a leaf in the share hash tree. This is fine,
812 # since any share corruption will be detected in the share hash
815 self.log("getting blockhashes for segment %d, share %d: %s" % \
816 (segnum, reader.shnum, str(needed)))
817 # TODO is force_remote necessary here?
818 d1 = reader.get_blockhashes(needed, force_remote=False)
819 if self.share_hash_tree.needed_hashes(reader.shnum):
820 need = self.share_hash_tree.needed_hashes(reader.shnum)
821 self.log("also need sharehashes for share %d: %s" % (reader.shnum,
823 d2 = reader.get_sharehashes(need, force_remote=False)
825 d2 = defer.succeed({}) # the logic in the next method
830 def _decode_blocks(self, results, segnum):
832 I take a list of k blocks and salts, and decode that into a
833 single encrypted segment.
835 # 'results' is one or more dicts (each {shnum:(block,salt)}), and we
836 # want to merge them all
837 blocks_and_salts = {}
839 blocks_and_salts.update(d)
841 # All of these blocks should have the same salt; in SDMF, it is
842 # the file-wide IV, while in MDMF it is the per-segment salt. In
843 # either case, we just need to get one of them and use it.
845 # d.items()[0] is like (shnum, (block, salt))
846 # d.items()[0][1] is like (block, salt)
847 # d.items()[0][1][1] is the salt.
848 salt = blocks_and_salts.items()[0][1][1]
849 # Next, extract just the blocks from the dict. We'll use the
850 # salt in the next step.
851 share_and_shareids = [(k, v[0]) for k, v in blocks_and_salts.items()]
852 d2 = dict(share_and_shareids)
855 for shareid, share in d2.items():
856 shareids.append(shareid)
859 self._set_current_status("decoding")
860 started = time.time()
861 assert len(shareids) >= self._required_shares, len(shareids)
862 # zfec really doesn't want extra shares
863 shareids = shareids[:self._required_shares]
864 shares = shares[:self._required_shares]
865 self.log("decoding segment %d" % segnum)
866 if segnum == self._num_segments - 1:
867 d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
869 d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
870 def _process(buffers):
871 segment = "".join(buffers)
872 self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
874 numsegs=self._num_segments,
876 self.log(" joined length %d, datalength %d" %
877 (len(segment), self._data_length))
878 if segnum == self._num_segments - 1:
879 size_to_use = self._tail_data_size
881 size_to_use = self._segment_size
882 segment = segment[:size_to_use]
883 self.log(" segment len=%d" % len(segment))
884 self._status.accumulate_decode_time(time.time() - started)
886 d.addCallback(_process)
890 def _decrypt_segment(self, segment_and_salt):
892 I take a single segment and its salt, and decrypt it. I return
893 the plaintext of the segment that is in my argument.
895 segment, salt = segment_and_salt
896 self._set_current_status("decrypting")
897 self.log("decrypting segment %d" % self._current_segment)
898 started = time.time()
899 key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
901 plaintext = decryptor.process(segment)
902 self._status.accumulate_decrypt_time(time.time() - started)
906 def notify_server_corruption(self, server, shnum, reason):
907 rref = server.get_rref()
908 rref.callRemoteOnly("advise_corrupt_share",
909 "mutable", self._storage_index, shnum, reason)
912 def _try_to_validate_privkey(self, enc_privkey, reader, server):
913 alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
914 alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
915 if alleged_writekey != self._node.get_writekey():
916 self.log("invalid privkey from %s shnum %d" %
917 (reader, reader.shnum),
918 level=log.WEIRD, umid="YIw4tA")
920 self.servermap.mark_bad_share(server, reader.shnum,
922 e = CorruptShareError(server,
925 f = failure.Failure(e)
926 self._bad_shares.add((server, reader.shnum, f))
930 self.log("got valid privkey from shnum %d on reader %s" %
931 (reader.shnum, reader))
932 privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
933 self._node._populate_encprivkey(enc_privkey)
934 self._node._populate_privkey(privkey)
935 self._need_privkey = False
941 I am called by _download_current_segment when the download process
942 has finished successfully. After making some useful logging
943 statements, I return the decrypted contents to the owner of this
944 Retrieve object through self._done_deferred.
946 self._running = False
947 self._status.set_active(False)
949 self._status.timings['total'] = now - self._started
950 self._status.timings['fetch'] = now - self._started_fetching
951 self._status.set_status("Finished")
952 self._status.set_progress(1.0)
954 # remember the encoding parameters, use them again next time
955 (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
956 offsets_tuple) = self.verinfo
957 self._node._populate_required_shares(k)
958 self._node._populate_total_shares(N)
961 ret = self._bad_shares
962 self.log("done verifying, found %d bad shares" % len(ret))
964 # TODO: upload status here?
966 self._consumer.unregisterProducer()
967 eventually(self._done_deferred.callback, ret)
970 def _raise_notenoughshareserror(self):
972 I am called when there are not enough active servers left to complete
973 the download. After making some useful logging statements, I throw an
974 exception to that effect to the caller of this Retrieve object through
978 format = ("ran out of servers: "
979 "have %(have)d of %(total)d segments; "
980 "found %(bad)d bad shares; "
981 "have %(remaining)d remaining shares of the right version; "
982 "encoding %(k)d-of-%(n)d")
983 args = {"have": self._current_segment,
984 "total": self._num_segments,
985 "need": self._last_segment,
986 "k": self._required_shares,
987 "n": self._total_shares,
988 "bad": len(self._bad_shares),
989 "remaining": len(self.remaining_sharemap),
991 raise NotEnoughSharesError("%s, last failure: %s" %
992 (format % args, str(self._last_failure)))
995 # all errors, including NotEnoughSharesError, land here
996 self._running = False
997 self._status.set_active(False)
999 self._status.timings['total'] = now - self._started
1000 self._status.timings['fetch'] = now - self._started_fetching
1001 self._status.set_status("Failed")
1002 eventually(self._done_deferred.errback, f)