]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/retrieve.py
291fb120c9ab1f86c6f5547c81679980b3fd0a20
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / retrieve.py
1
2 import time
3 from itertools import count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from twisted.internet.interfaces import IPushProducer, IConsumer
8 from foolscap.api import eventually, fireEventually, DeadReferenceError, \
9      RemoteException
10 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
11      DownloadStopped, MDMF_VERSION, SDMF_VERSION
12 from allmydata.util import hashutil, log, mathutil, deferredutil
13 from allmydata.util.dictutil import DictOfSets
14 from allmydata import hashtree, codec
15 from allmydata.storage.server import si_b2a
16 from pycryptopp.cipher.aes import AES
17 from pycryptopp.publickey import rsa
18
19 from allmydata.mutable.common import CorruptShareError, BadShareError, \
20      UncoordinatedWriteError
21 from allmydata.mutable.layout import MDMFSlotReadProxy
22
23 class RetrieveStatus:
24     implements(IRetrieveStatus)
25     statusid_counter = count(0)
26     def __init__(self):
27         self.timings = {}
28         self.timings["fetch_per_server"] = {}
29         self.timings["decode"] = 0.0
30         self.timings["decrypt"] = 0.0
31         self.timings["cumulative_verify"] = 0.0
32         self._problems = {}
33         self.active = True
34         self.storage_index = None
35         self.helper = False
36         self.encoding = ("?","?")
37         self.size = None
38         self.status = "Not started"
39         self.progress = 0.0
40         self.counter = self.statusid_counter.next()
41         self.started = time.time()
42
43     def get_started(self):
44         return self.started
45     def get_storage_index(self):
46         return self.storage_index
47     def get_encoding(self):
48         return self.encoding
49     def using_helper(self):
50         return self.helper
51     def get_size(self):
52         return self.size
53     def get_status(self):
54         return self.status
55     def get_progress(self):
56         return self.progress
57     def get_active(self):
58         return self.active
59     def get_counter(self):
60         return self.counter
61     def get_problems(self):
62         return self._problems
63
64     def add_fetch_timing(self, server, elapsed):
65         if server not in self.timings["fetch_per_server"]:
66             self.timings["fetch_per_server"][server] = []
67         self.timings["fetch_per_server"][server].append(elapsed)
68     def accumulate_decode_time(self, elapsed):
69         self.timings["decode"] += elapsed
70     def accumulate_decrypt_time(self, elapsed):
71         self.timings["decrypt"] += elapsed
72     def set_storage_index(self, si):
73         self.storage_index = si
74     def set_helper(self, helper):
75         self.helper = helper
76     def set_encoding(self, k, n):
77         self.encoding = (k, n)
78     def set_size(self, size):
79         self.size = size
80     def set_status(self, status):
81         self.status = status
82     def set_progress(self, value):
83         self.progress = value
84     def set_active(self, value):
85         self.active = value
86     def add_problem(self, server, f):
87         serverid = server.get_serverid()
88         self._problems[serverid] = f
89
90 class Marker:
91     pass
92
93 class Retrieve:
94     # this class is currently single-use. Eventually (in MDMF) we will make
95     # it multi-use, in which case you can call download(range) multiple
96     # times, and each will have a separate response chain. However the
97     # Retrieve object will remain tied to a specific version of the file, and
98     # will use a single ServerMap instance.
99     implements(IPushProducer)
100
101     def __init__(self, filenode, storage_broker, servermap, verinfo,
102                  fetch_privkey=False, verify=False):
103         self._node = filenode
104         assert self._node.get_pubkey()
105         self._storage_broker = storage_broker
106         self._storage_index = filenode.get_storage_index()
107         assert self._node.get_readkey()
108         self._last_failure = None
109         prefix = si_b2a(self._storage_index)[:5]
110         self._log_number = log.msg("Retrieve(%s): starting" % prefix)
111         self._running = True
112         self._decoding = False
113         self._bad_shares = set()
114
115         self.servermap = servermap
116         assert self._node.get_pubkey()
117         self.verinfo = verinfo
118         # during repair, we may be called upon to grab the private key, since
119         # it wasn't picked up during a verify=False checker run, and we'll
120         # need it for repair to generate a new version.
121         self._need_privkey = verify or (fetch_privkey
122                                         and not self._node.get_privkey())
123
124         if self._need_privkey:
125             # TODO: Evaluate the need for this. We'll use it if we want
126             # to limit how many queries are on the wire for the privkey
127             # at once.
128             self._privkey_query_markers = [] # one Marker for each time we've
129                                              # tried to get the privkey.
130
131         # verify means that we are using the downloader logic to verify all
132         # of our shares. This tells the downloader a few things.
133         # 
134         # 1. We need to download all of the shares.
135         # 2. We don't need to decode or decrypt the shares, since our
136         #    caller doesn't care about the plaintext, only the
137         #    information about which shares are or are not valid.
138         # 3. When we are validating readers, we need to validate the
139         #    signature on the prefix. Do we? We already do this in the
140         #    servermap update?
141         self._verify = verify
142
143         self._status = RetrieveStatus()
144         self._status.set_storage_index(self._storage_index)
145         self._status.set_helper(False)
146         self._status.set_progress(0.0)
147         self._status.set_active(True)
148         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
149          offsets_tuple) = self.verinfo
150         self._status.set_size(datalength)
151         self._status.set_encoding(k, N)
152         self.readers = {}
153         self._stopped = False
154         self._pause_deferred = None
155         self._offset = None
156         self._read_length = None
157         self.log("got seqnum %d" % self.verinfo[0])
158
159
160     def get_status(self):
161         return self._status
162
163     def log(self, *args, **kwargs):
164         if "parent" not in kwargs:
165             kwargs["parent"] = self._log_number
166         if "facility" not in kwargs:
167             kwargs["facility"] = "tahoe.mutable.retrieve"
168         return log.msg(*args, **kwargs)
169
170     def _set_current_status(self, state):
171         seg = "%d/%d" % (self._current_segment, self._last_segment)
172         self._status.set_status("segment %s (%s)" % (seg, state))
173
174     ###################
175     # IPushProducer
176
177     def pauseProducing(self):
178         """
179         I am called by my download target if we have produced too much
180         data for it to handle. I make the downloader stop producing new
181         data until my resumeProducing method is called.
182         """
183         if self._pause_deferred is not None:
184             return
185
186         # fired when the download is unpaused.
187         self._old_status = self._status.get_status()
188         self._set_current_status("paused")
189
190         self._pause_deferred = defer.Deferred()
191
192
193     def resumeProducing(self):
194         """
195         I am called by my download target once it is ready to begin
196         receiving data again.
197         """
198         if self._pause_deferred is None:
199             return
200
201         p = self._pause_deferred
202         self._pause_deferred = None
203         self._status.set_status(self._old_status)
204
205         eventually(p.callback, None)
206
207     def stopProducing(self):
208         self._stopped = True
209         self.resumeProducing()
210
211
212     def _check_for_paused(self, res):
213         """
214         I am called just before a write to the consumer. I return a
215         Deferred that eventually fires with the data that is to be
216         written to the consumer. If the download has not been paused,
217         the Deferred fires immediately. Otherwise, the Deferred fires
218         when the downloader is unpaused.
219         """
220         if self._pause_deferred is not None:
221             d = defer.Deferred()
222             self._pause_deferred.addCallback(lambda ignored: d.callback(res))
223             return d
224         return res
225
226     def _check_for_stopped(self, res):
227         if self._stopped:
228             raise DownloadStopped("our Consumer called stopProducing()")
229         return res
230
231
232     def download(self, consumer=None, offset=0, size=None):
233         assert IConsumer.providedBy(consumer) or self._verify
234
235         if consumer:
236             self._consumer = consumer
237             # we provide IPushProducer, so streaming=True, per
238             # IConsumer.
239             self._consumer.registerProducer(self, streaming=True)
240
241         self._done_deferred = defer.Deferred()
242         self._offset = offset
243         self._read_length = size
244         self._setup_download()
245         self._setup_encoding_parameters()
246         self.log("starting download")
247         self._started_fetching = time.time()
248         # The download process beyond this is a state machine.
249         # _add_active_servers will select the servers that we want to use
250         # for the download, and then attempt to start downloading. After
251         # each segment, it will check for doneness, reacting to broken
252         # servers and corrupt shares as necessary. If it runs out of good
253         # servers before downloading all of the segments, _done_deferred
254         # will errback.  Otherwise, it will eventually callback with the
255         # contents of the mutable file.
256         self.loop()
257         return self._done_deferred
258
259     def loop(self):
260         d = fireEventually(None) # avoid #237 recursion limit problem
261         d.addCallback(lambda ign: self._activate_enough_servers())
262         d.addCallback(lambda ign: self._download_current_segment())
263         # when we're done, _download_current_segment will call _done. If we
264         # aren't, it will call loop() again.
265         d.addErrback(self._error)
266
267     def _setup_download(self):
268         self._started = time.time()
269         self._status.set_status("Retrieving Shares")
270
271         # how many shares do we need?
272         (seqnum,
273          root_hash,
274          IV,
275          segsize,
276          datalength,
277          k,
278          N,
279          prefix,
280          offsets_tuple) = self.verinfo
281
282         # first, which servers can we use?
283         versionmap = self.servermap.make_versionmap()
284         shares = versionmap[self.verinfo]
285         # this sharemap is consumed as we decide to send requests
286         self.remaining_sharemap = DictOfSets()
287         for (shnum, server, timestamp) in shares:
288             self.remaining_sharemap.add(shnum, server)
289             # If the servermap update fetched anything, it fetched at least 1
290             # KiB, so we ask for that much.
291             # TODO: Change the cache methods to allow us to fetch all of the
292             # data that they have, then change this method to do that.
293             any_cache = self._node._read_from_cache(self.verinfo, shnum,
294                                                     0, 1000)
295             reader = MDMFSlotReadProxy(server.get_rref(),
296                                        self._storage_index,
297                                        shnum,
298                                        any_cache)
299             reader.server = server
300             self.readers[shnum] = reader
301         assert len(self.remaining_sharemap) >= k
302
303         self.shares = {} # maps shnum to validated blocks
304         self._active_readers = [] # list of active readers for this dl.
305         self._block_hash_trees = {} # shnum => hashtree
306
307         # We need one share hash tree for the entire file; its leaves
308         # are the roots of the block hash trees for the shares that
309         # comprise it, and its root is in the verinfo.
310         self.share_hash_tree = hashtree.IncompleteHashTree(N)
311         self.share_hash_tree.set_hashes({0: root_hash})
312
313     def decode(self, blocks_and_salts, segnum):
314         """
315         I am a helper method that the mutable file update process uses
316         as a shortcut to decode and decrypt the segments that it needs
317         to fetch in order to perform a file update. I take in a
318         collection of blocks and salts, and pick some of those to make a
319         segment with. I return the plaintext associated with that
320         segment.
321         """
322         # shnum => block hash tree. Unused, but setup_encoding_parameters will
323         # want to set this.
324         self._block_hash_trees = None
325         self._setup_encoding_parameters()
326
327         # _decode_blocks() expects the output of a gatherResults that
328         # contains the outputs of _validate_block() (each of which is a dict
329         # mapping shnum to (block,salt) bytestrings).
330         d = self._decode_blocks([blocks_and_salts], segnum)
331         d.addCallback(self._decrypt_segment)
332         return d
333
334
335     def _setup_encoding_parameters(self):
336         """
337         I set up the encoding parameters, including k, n, the number
338         of segments associated with this file, and the segment decoders.
339         """
340         (seqnum,
341          root_hash,
342          IV,
343          segsize,
344          datalength,
345          k,
346          n,
347          known_prefix,
348          offsets_tuple) = self.verinfo
349         self._required_shares = k
350         self._total_shares = n
351         self._segment_size = segsize
352         self._data_length = datalength
353
354         if not IV:
355             self._version = MDMF_VERSION
356         else:
357             self._version = SDMF_VERSION
358
359         if datalength and segsize:
360             self._num_segments = mathutil.div_ceil(datalength, segsize)
361             self._tail_data_size = datalength % segsize
362         else:
363             self._num_segments = 0
364             self._tail_data_size = 0
365
366         self._segment_decoder = codec.CRSDecoder()
367         self._segment_decoder.set_params(segsize, k, n)
368
369         if  not self._tail_data_size:
370             self._tail_data_size = segsize
371
372         self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
373                                                          self._required_shares)
374         if self._tail_segment_size == self._segment_size:
375             self._tail_decoder = self._segment_decoder
376         else:
377             self._tail_decoder = codec.CRSDecoder()
378             self._tail_decoder.set_params(self._tail_segment_size,
379                                           self._required_shares,
380                                           self._total_shares)
381
382         self.log("got encoding parameters: "
383                  "k: %d "
384                  "n: %d "
385                  "%d segments of %d bytes each (%d byte tail segment)" % \
386                  (k, n, self._num_segments, self._segment_size,
387                   self._tail_segment_size))
388
389         if self._block_hash_trees is not None:
390             for i in xrange(self._total_shares):
391                 # So we don't have to do this later.
392                 self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
393
394         # Our last task is to tell the downloader where to start and
395         # where to stop. We use three parameters for that:
396         #   - self._start_segment: the segment that we need to start
397         #     downloading from. 
398         #   - self._current_segment: the next segment that we need to
399         #     download.
400         #   - self._last_segment: The last segment that we were asked to
401         #     download.
402         #
403         #  We say that the download is complete when
404         #  self._current_segment > self._last_segment. We use
405         #  self._start_segment and self._last_segment to know when to
406         #  strip things off of segments, and how much to strip.
407         if self._offset:
408             self.log("got offset: %d" % self._offset)
409             # our start segment is the first segment containing the
410             # offset we were given. 
411             start = self._offset // self._segment_size
412
413             assert start < self._num_segments
414             self._start_segment = start
415             self.log("got start segment: %d" % self._start_segment)
416         else:
417             self._start_segment = 0
418
419
420         # If self._read_length is None, then we want to read the whole
421         # file. Otherwise, we want to read only part of the file, and
422         # need to figure out where to stop reading.
423         if self._read_length is not None:
424             # our end segment is the last segment containing part of the
425             # segment that we were asked to read.
426             self.log("got read length %d" % self._read_length)
427             if self._read_length != 0:
428                 end_data = self._offset + self._read_length
429
430                 # We don't actually need to read the byte at end_data,
431                 # but the one before it.
432                 end = (end_data - 1) // self._segment_size
433
434                 assert end < self._num_segments
435                 self._last_segment = end
436             else:
437                 self._last_segment = self._start_segment
438             self.log("got end segment: %d" % self._last_segment)
439         else:
440             self._last_segment = self._num_segments - 1
441
442         self._current_segment = self._start_segment
443
444     def _activate_enough_servers(self):
445         """
446         I populate self._active_readers with enough active readers to
447         retrieve the contents of this mutable file. I am called before
448         downloading starts, and (eventually) after each validation
449         error, connection error, or other problem in the download.
450         """
451         # TODO: It would be cool to investigate other heuristics for
452         # reader selection. For instance, the cost (in time the user
453         # spends waiting for their file) of selecting a really slow server
454         # that happens to have a primary share is probably more than
455         # selecting a really fast server that doesn't have a primary
456         # share. Maybe the servermap could be extended to provide this
457         # information; it could keep track of latency information while
458         # it gathers more important data, and then this routine could
459         # use that to select active readers.
460         #
461         # (these and other questions would be easier to answer with a
462         #  robust, configurable tahoe-lafs simulator, which modeled node
463         #  failures, differences in node speed, and other characteristics
464         #  that we expect storage servers to have.  You could have
465         #  presets for really stable grids (like allmydata.com),
466         #  friendnets, make it easy to configure your own settings, and
467         #  then simulate the effect of big changes on these use cases
468         #  instead of just reasoning about what the effect might be. Out
469         #  of scope for MDMF, though.)
470
471         # XXX: Why don't format= log messages work here?
472
473         known_shnums = set(self.remaining_sharemap.keys())
474         used_shnums = set([r.shnum for r in self._active_readers])
475         unused_shnums = known_shnums - used_shnums
476
477         if self._verify:
478             new_shnums = unused_shnums # use them all
479         elif len(self._active_readers) < self._required_shares:
480             # need more shares
481             more = self._required_shares - len(self._active_readers)
482             # We favor lower numbered shares, since FEC is faster with
483             # primary shares than with other shares, and lower-numbered
484             # shares are more likely to be primary than higher numbered
485             # shares.
486             new_shnums = sorted(unused_shnums)[:more]
487             if len(new_shnums) < more:
488                 # We don't have enough readers to retrieve the file; fail.
489                 self._raise_notenoughshareserror()
490         else:
491             new_shnums = []
492
493         self.log("adding %d new servers to the active list" % len(new_shnums))
494         for shnum in new_shnums:
495             reader = self.readers[shnum]
496             self._active_readers.append(reader)
497             self.log("added reader for share %d" % shnum)
498             # Each time we add a reader, we check to see if we need the
499             # private key. If we do, we politely ask for it and then continue
500             # computing. If we find that we haven't gotten it at the end of
501             # segment decoding, then we'll take more drastic measures.
502             if self._need_privkey and not self._node.is_readonly():
503                 d = reader.get_encprivkey()
504                 d.addCallback(self._try_to_validate_privkey, reader, reader.server)
505                 # XXX: don't just drop the Deferred. We need error-reporting
506                 # but not flow-control here.
507
508     def _try_to_validate_prefix(self, prefix, reader):
509         """
510         I check that the prefix returned by a candidate server for
511         retrieval matches the prefix that the servermap knows about
512         (and, hence, the prefix that was validated earlier). If it does,
513         I return True, which means that I approve of the use of the
514         candidate server for segment retrieval. If it doesn't, I return
515         False, which means that another server must be chosen.
516         """
517         (seqnum,
518          root_hash,
519          IV,
520          segsize,
521          datalength,
522          k,
523          N,
524          known_prefix,
525          offsets_tuple) = self.verinfo
526         if known_prefix != prefix:
527             self.log("prefix from share %d doesn't match" % reader.shnum)
528             raise UncoordinatedWriteError("Mismatched prefix -- this could "
529                                           "indicate an uncoordinated write")
530         # Otherwise, we're okay -- no issues.
531
532
533     def _remove_reader(self, reader):
534         """
535         At various points, we will wish to remove a server from
536         consideration and/or use. These include, but are not necessarily
537         limited to:
538
539             - A connection error.
540             - A mismatched prefix (that is, a prefix that does not match
541               our conception of the version information string).
542             - A failing block hash, salt hash, or share hash, which can
543               indicate disk failure/bit flips, or network trouble.
544
545         This method will do that. I will make sure that the
546         (shnum,reader) combination represented by my reader argument is
547         not used for anything else during this download. I will not
548         advise the reader of any corruption, something that my callers
549         may wish to do on their own.
550         """
551         # TODO: When you're done writing this, see if this is ever
552         # actually used for something that _mark_bad_share isn't. I have
553         # a feeling that they will be used for very similar things, and
554         # that having them both here is just going to be an epic amount
555         # of code duplication.
556         #
557         # (well, okay, not epic, but meaningful)
558         self.log("removing reader %s" % reader)
559         # Remove the reader from _active_readers
560         self._active_readers.remove(reader)
561         # TODO: self.readers.remove(reader)?
562         for shnum in list(self.remaining_sharemap.keys()):
563             self.remaining_sharemap.discard(shnum, reader.server)
564
565
566     def _mark_bad_share(self, server, shnum, reader, f):
567         """
568         I mark the given (server, shnum) as a bad share, which means that it
569         will not be used anywhere else.
570
571         There are several reasons to want to mark something as a bad
572         share. These include:
573
574             - A connection error to the server.
575             - A mismatched prefix (that is, a prefix that does not match
576               our local conception of the version information string).
577             - A failing block hash, salt hash, share hash, or other
578               integrity check.
579
580         This method will ensure that readers that we wish to mark bad
581         (for these reasons or other reasons) are not used for the rest
582         of the download. Additionally, it will attempt to tell the
583         remote server (with no guarantee of success) that its share is
584         corrupt.
585         """
586         self.log("marking share %d on server %s as bad" % \
587                  (shnum, server.get_name()))
588         prefix = self.verinfo[-2]
589         self.servermap.mark_bad_share(server, shnum, prefix)
590         self._remove_reader(reader)
591         self._bad_shares.add((server, shnum, f))
592         self._status.add_problem(server, f)
593         self._last_failure = f
594         if f.check(BadShareError):
595             self.notify_server_corruption(server, shnum, str(f.value))
596
597
598     def _download_current_segment(self):
599         """
600         I download, validate, decode, decrypt, and assemble the segment
601         that this Retrieve is currently responsible for downloading.
602         """
603         if self._current_segment > self._last_segment:
604             # No more segments to download, we're done.
605             self.log("got plaintext, done")
606             return self._done()
607         elif self._verify and len(self._active_readers) == 0:
608             self.log("no more good shares, no need to keep verifying")
609             return self._done()
610         self.log("on segment %d of %d" %
611                  (self._current_segment + 1, self._num_segments))
612         d = self._process_segment(self._current_segment)
613         d.addCallback(lambda ign: self.loop())
614         return d
615
616     def _process_segment(self, segnum):
617         """
618         I download, validate, decode, and decrypt one segment of the
619         file that this Retrieve is retrieving. This means coordinating
620         the process of getting k blocks of that file, validating them,
621         assembling them into one segment with the decoder, and then
622         decrypting them.
623         """
624         self.log("processing segment %d" % segnum)
625
626         # TODO: The old code uses a marker. Should this code do that
627         # too? What did the Marker do?
628
629         # We need to ask each of our active readers for its block and
630         # salt. We will then validate those. If validation is
631         # successful, we will assemble the results into plaintext.
632         ds = []
633         for reader in self._active_readers:
634             started = time.time()
635             d1 = reader.get_block_and_salt(segnum)
636             d2,d3 = self._get_needed_hashes(reader, segnum)
637             d = deferredutil.gatherResults([d1,d2,d3])
638             d.addCallback(self._validate_block, segnum, reader, reader.server, started)
639             # _handle_bad_share takes care of recoverable errors (by dropping
640             # that share and returning None). Any other errors (i.e. code
641             # bugs) are passed through and cause the retrieve to fail.
642             d.addErrback(self._handle_bad_share, [reader])
643             ds.append(d)
644         dl = deferredutil.gatherResults(ds)
645         if self._verify:
646             dl.addCallback(lambda ignored: "")
647             dl.addCallback(self._set_segment)
648         else:
649             dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
650         return dl
651
652
653     def _maybe_decode_and_decrypt_segment(self, results, segnum):
654         """
655         I take the results of fetching and validating the blocks from
656         _process_segment. If validation and fetching succeeded without
657         incident, I will proceed with decoding and decryption. Otherwise, I
658         will do nothing.
659         """
660         self.log("trying to decode and decrypt segment %d" % segnum)
661
662         # 'results' is the output of a gatherResults set up in
663         # _process_segment(). Each component Deferred will either contain the
664         # non-Failure output of _validate_block() for a single block (i.e.
665         # {segnum:(block,salt)}), or None if _validate_block threw an
666         # exception and _validation_or_decoding_failed handled it (by
667         # dropping that server).
668
669         if None in results:
670             self.log("some validation operations failed; not proceeding")
671             return defer.succeed(None)
672         self.log("everything looks ok, building segment %d" % segnum)
673         d = self._decode_blocks(results, segnum)
674         d.addCallback(self._decrypt_segment)
675         # check to see whether we've been paused before writing
676         # anything.
677         d.addCallback(self._check_for_paused)
678         d.addCallback(self._check_for_stopped)
679         d.addCallback(self._set_segment)
680         return d
681
682
683     def _set_segment(self, segment):
684         """
685         Given a plaintext segment, I register that segment with the
686         target that is handling the file download.
687         """
688         self.log("got plaintext for segment %d" % self._current_segment)
689         if self._current_segment == self._start_segment:
690             # We're on the first segment. It's possible that we want
691             # only some part of the end of this segment, and that we
692             # just downloaded the whole thing to get that part. If so,
693             # we need to account for that and give the reader just the
694             # data that they want.
695             n = self._offset % self._segment_size
696             self.log("stripping %d bytes off of the first segment" % n)
697             self.log("original segment length: %d" % len(segment))
698             segment = segment[n:]
699             self.log("new segment length: %d" % len(segment))
700
701         if self._current_segment == self._last_segment and self._read_length is not None:
702             # We're on the last segment. It's possible that we only want
703             # part of the beginning of this segment, and that we
704             # downloaded the whole thing anyway. Make sure to give the
705             # caller only the portion of the segment that they want to
706             # receive.
707             extra = self._read_length
708             if self._start_segment != self._last_segment:
709                 extra -= self._segment_size - \
710                             (self._offset % self._segment_size)
711             extra %= self._segment_size
712             self.log("original segment length: %d" % len(segment))
713             segment = segment[:extra]
714             self.log("new segment length: %d" % len(segment))
715             self.log("only taking %d bytes of the last segment" % extra)
716
717         if not self._verify:
718             self._consumer.write(segment)
719         else:
720             # we don't care about the plaintext if we are doing a verify.
721             segment = None
722         self._current_segment += 1
723
724
725     def _handle_bad_share(self, f, readers):
726         """
727         I am called when a block or a salt fails to correctly validate, or when
728         the decryption or decoding operation fails for some reason.  I react to
729         this failure by notifying the remote server of corruption, and then
730         removing the remote server from further activity.
731         """
732         # these are the errors we can tolerate: by giving up on this share
733         # and finding others to replace it. Any other errors (i.e. coding
734         # bugs) are re-raised, causing the download to fail.
735         f.trap(DeadReferenceError, RemoteException, BadShareError)
736
737         # DeadReferenceError happens when we try to fetch data from a server
738         # that has gone away. RemoteException happens if the server had an
739         # internal error. BadShareError encompasses: (UnknownVersionError,
740         # LayoutInvalid, struct.error) which happen when we get obviously
741         # wrong data, and CorruptShareError which happens later, when we
742         # perform integrity checks on the data.
743
744         assert isinstance(readers, list)
745         bad_shnums = [reader.shnum for reader in readers]
746
747         self.log("validation or decoding failed on share(s) %s, server(s) %s "
748                  ", segment %d: %s" % \
749                  (bad_shnums, readers, self._current_segment, str(f)))
750         for reader in readers:
751             self._mark_bad_share(reader.server, reader.shnum, reader, f)
752         return None
753
754
755     def _validate_block(self, results, segnum, reader, server, started):
756         """
757         I validate a block from one share on a remote server.
758         """
759         # Grab the part of the block hash tree that is necessary to
760         # validate this block, then generate the block hash root.
761         self.log("validating share %d for segment %d" % (reader.shnum,
762                                                              segnum))
763         elapsed = time.time() - started
764         self._status.add_fetch_timing(server, elapsed)
765         self._set_current_status("validating blocks")
766
767         block_and_salt, blockhashes, sharehashes = results
768         block, salt = block_and_salt
769
770         blockhashes = dict(enumerate(blockhashes))
771         self.log("the reader gave me the following blockhashes: %s" % \
772                  blockhashes.keys())
773         self.log("the reader gave me the following sharehashes: %s" % \
774                  sharehashes.keys())
775         bht = self._block_hash_trees[reader.shnum]
776
777         if bht.needed_hashes(segnum, include_leaf=True):
778             try:
779                 bht.set_hashes(blockhashes)
780             except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
781                     IndexError), e:
782                 raise CorruptShareError(server,
783                                         reader.shnum,
784                                         "block hash tree failure: %s" % e)
785
786         if self._version == MDMF_VERSION:
787             blockhash = hashutil.block_hash(salt + block)
788         else:
789             blockhash = hashutil.block_hash(block)
790         # If this works without an error, then validation is
791         # successful.
792         try:
793            bht.set_hashes(leaves={segnum: blockhash})
794         except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
795                 IndexError), e:
796             raise CorruptShareError(server,
797                                     reader.shnum,
798                                     "block hash tree failure: %s" % e)
799
800         # Reaching this point means that we know that this segment
801         # is correct. Now we need to check to see whether the share
802         # hash chain is also correct. 
803         # SDMF wrote share hash chains that didn't contain the
804         # leaves, which would be produced from the block hash tree.
805         # So we need to validate the block hash tree first. If
806         # successful, then bht[0] will contain the root for the
807         # shnum, which will be a leaf in the share hash tree, which
808         # will allow us to validate the rest of the tree.
809         try:
810             self.share_hash_tree.set_hashes(hashes=sharehashes,
811                                         leaves={reader.shnum: bht[0]})
812         except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
813                 IndexError), e:
814             raise CorruptShareError(server,
815                                     reader.shnum,
816                                     "corrupt hashes: %s" % e)
817
818         self.log('share %d is valid for segment %d' % (reader.shnum,
819                                                        segnum))
820         return {reader.shnum: (block, salt)}
821
822
823     def _get_needed_hashes(self, reader, segnum):
824         """
825         I get the hashes needed to validate segnum from the reader, then return
826         to my caller when this is done.
827         """
828         bht = self._block_hash_trees[reader.shnum]
829         needed = bht.needed_hashes(segnum, include_leaf=True)
830         # The root of the block hash tree is also a leaf in the share
831         # hash tree. So we don't need to fetch it from the remote
832         # server. In the case of files with one segment, this means that
833         # we won't fetch any block hash tree from the remote server,
834         # since the hash of each share of the file is the entire block
835         # hash tree, and is a leaf in the share hash tree. This is fine,
836         # since any share corruption will be detected in the share hash
837         # tree.
838         #needed.discard(0)
839         self.log("getting blockhashes for segment %d, share %d: %s" % \
840                  (segnum, reader.shnum, str(needed)))
841         d1 = reader.get_blockhashes(needed, force_remote=True)
842         if self.share_hash_tree.needed_hashes(reader.shnum):
843             need = self.share_hash_tree.needed_hashes(reader.shnum)
844             self.log("also need sharehashes for share %d: %s" % (reader.shnum,
845                                                                  str(need)))
846             d2 = reader.get_sharehashes(need, force_remote=True)
847         else:
848             d2 = defer.succeed({}) # the logic in the next method
849                                    # expects a dict
850         return d1,d2
851
852
853     def _decode_blocks(self, results, segnum):
854         """
855         I take a list of k blocks and salts, and decode that into a
856         single encrypted segment.
857         """
858         # 'results' is one or more dicts (each {shnum:(block,salt)}), and we
859         # want to merge them all
860         blocks_and_salts = {}
861         for d in results:
862             blocks_and_salts.update(d)
863
864         # All of these blocks should have the same salt; in SDMF, it is
865         # the file-wide IV, while in MDMF it is the per-segment salt. In
866         # either case, we just need to get one of them and use it.
867         #
868         # d.items()[0] is like (shnum, (block, salt))
869         # d.items()[0][1] is like (block, salt)
870         # d.items()[0][1][1] is the salt.
871         salt = blocks_and_salts.items()[0][1][1]
872         # Next, extract just the blocks from the dict. We'll use the
873         # salt in the next step.
874         share_and_shareids = [(k, v[0]) for k, v in blocks_and_salts.items()]
875         d2 = dict(share_and_shareids)
876         shareids = []
877         shares = []
878         for shareid, share in d2.items():
879             shareids.append(shareid)
880             shares.append(share)
881
882         self._set_current_status("decoding")
883         started = time.time()
884         assert len(shareids) >= self._required_shares, len(shareids)
885         # zfec really doesn't want extra shares
886         shareids = shareids[:self._required_shares]
887         shares = shares[:self._required_shares]
888         self.log("decoding segment %d" % segnum)
889         if segnum == self._num_segments - 1:
890             d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
891         else:
892             d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
893         def _process(buffers):
894             segment = "".join(buffers)
895             self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
896                      segnum=segnum,
897                      numsegs=self._num_segments,
898                      level=log.NOISY)
899             self.log(" joined length %d, datalength %d" %
900                      (len(segment), self._data_length))
901             if segnum == self._num_segments - 1:
902                 size_to_use = self._tail_data_size
903             else:
904                 size_to_use = self._segment_size
905             segment = segment[:size_to_use]
906             self.log(" segment len=%d" % len(segment))
907             self._status.accumulate_decode_time(time.time() - started)
908             return segment, salt
909         d.addCallback(_process)
910         return d
911
912
913     def _decrypt_segment(self, segment_and_salt):
914         """
915         I take a single segment and its salt, and decrypt it. I return
916         the plaintext of the segment that is in my argument.
917         """
918         segment, salt = segment_and_salt
919         self._set_current_status("decrypting")
920         self.log("decrypting segment %d" % self._current_segment)
921         started = time.time()
922         key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
923         decryptor = AES(key)
924         plaintext = decryptor.process(segment)
925         self._status.accumulate_decrypt_time(time.time() - started)
926         return plaintext
927
928
929     def notify_server_corruption(self, server, shnum, reason):
930         rref = server.get_rref()
931         rref.callRemoteOnly("advise_corrupt_share",
932                             "mutable", self._storage_index, shnum, reason)
933
934
935     def _try_to_validate_privkey(self, enc_privkey, reader, server):
936         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
937         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
938         if alleged_writekey != self._node.get_writekey():
939             self.log("invalid privkey from %s shnum %d" %
940                      (reader, reader.shnum),
941                      level=log.WEIRD, umid="YIw4tA")
942             if self._verify:
943                 self.servermap.mark_bad_share(server, reader.shnum,
944                                               self.verinfo[-2])
945                 e = CorruptShareError(server,
946                                       reader.shnum,
947                                       "invalid privkey")
948                 f = failure.Failure(e)
949                 self._bad_shares.add((server, reader.shnum, f))
950             return
951
952         # it's good
953         self.log("got valid privkey from shnum %d on reader %s" %
954                  (reader.shnum, reader))
955         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
956         self._node._populate_encprivkey(enc_privkey)
957         self._node._populate_privkey(privkey)
958         self._need_privkey = False
959
960
961
962     def _done(self):
963         """
964         I am called by _download_current_segment when the download process
965         has finished successfully. After making some useful logging
966         statements, I return the decrypted contents to the owner of this
967         Retrieve object through self._done_deferred.
968         """
969         self._running = False
970         self._status.set_active(False)
971         now = time.time()
972         self._status.timings['total'] = now - self._started
973         self._status.timings['fetch'] = now - self._started_fetching
974         self._status.set_status("Finished")
975         self._status.set_progress(1.0)
976
977         # remember the encoding parameters, use them again next time
978         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
979          offsets_tuple) = self.verinfo
980         self._node._populate_required_shares(k)
981         self._node._populate_total_shares(N)
982
983         if self._verify:
984             ret = self._bad_shares
985             self.log("done verifying, found %d bad shares" % len(ret))
986         else:
987             # TODO: upload status here?
988             ret = self._consumer
989             self._consumer.unregisterProducer()
990         eventually(self._done_deferred.callback, ret)
991
992
993     def _raise_notenoughshareserror(self):
994         """
995         I am called by _activate_enough_servers when there are not enough
996         active servers left to complete the download. After making some
997         useful logging statements, I throw an exception to that effect
998         to the caller of this Retrieve object through
999         self._done_deferred.
1000         """
1001
1002         format = ("ran out of servers: "
1003                   "have %(have)d of %(total)d segments "
1004                   "found %(bad)d bad shares "
1005                   "encoding %(k)d-of-%(n)d")
1006         args = {"have": self._current_segment,
1007                 "total": self._num_segments,
1008                 "need": self._last_segment,
1009                 "k": self._required_shares,
1010                 "n": self._total_shares,
1011                 "bad": len(self._bad_shares)}
1012         raise NotEnoughSharesError("%s, last failure: %s" %
1013                                    (format % args, str(self._last_failure)))
1014
1015     def _error(self, f):
1016         # all errors, including NotEnoughSharesError, land here
1017         self._running = False
1018         self._status.set_active(False)
1019         now = time.time()
1020         self._status.timings['total'] = now - self._started
1021         self._status.timings['fetch'] = now - self._started_fetching
1022         self._status.set_status("Failed")
1023         eventually(self._done_deferred.errback, f)