]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/retrieve.py
mdmf: clean up boolean expressions, correct typos, remove self._paused, and don't...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / retrieve.py
1
2 import time
3 from itertools import count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from twisted.internet.interfaces import IPushProducer, IConsumer
8 from foolscap.api import eventually, fireEventually
9 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
10                                  MDMF_VERSION, SDMF_VERSION
11 from allmydata.util import hashutil, log, mathutil
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from pycryptopp.publickey import rsa
17
18 from allmydata.mutable.common import CorruptShareError, UncoordinatedWriteError
19 from allmydata.mutable.layout import MDMFSlotReadProxy
20
21 class RetrieveStatus:
22     implements(IRetrieveStatus)
23     statusid_counter = count(0)
24     def __init__(self):
25         self.timings = {}
26         self.timings["fetch_per_server"] = {}
27         self.timings["cumulative_verify"] = 0.0
28         self.problems = {}
29         self.active = True
30         self.storage_index = None
31         self.helper = False
32         self.encoding = ("?","?")
33         self.size = None
34         self.status = "Not started"
35         self.progress = 0.0
36         self.counter = self.statusid_counter.next()
37         self.started = time.time()
38
39     def get_started(self):
40         return self.started
41     def get_storage_index(self):
42         return self.storage_index
43     def get_encoding(self):
44         return self.encoding
45     def using_helper(self):
46         return self.helper
47     def get_size(self):
48         return self.size
49     def get_status(self):
50         return self.status
51     def get_progress(self):
52         return self.progress
53     def get_active(self):
54         return self.active
55     def get_counter(self):
56         return self.counter
57
58     def add_fetch_timing(self, peerid, elapsed):
59         if peerid not in self.timings["fetch_per_server"]:
60             self.timings["fetch_per_server"][peerid] = []
61         self.timings["fetch_per_server"][peerid].append(elapsed)
62     def set_storage_index(self, si):
63         self.storage_index = si
64     def set_helper(self, helper):
65         self.helper = helper
66     def set_encoding(self, k, n):
67         self.encoding = (k, n)
68     def set_size(self, size):
69         self.size = size
70     def set_status(self, status):
71         self.status = status
72     def set_progress(self, value):
73         self.progress = value
74     def set_active(self, value):
75         self.active = value
76
77 class Marker:
78     pass
79
80 class Retrieve:
81     # this class is currently single-use. Eventually (in MDMF) we will make
82     # it multi-use, in which case you can call download(range) multiple
83     # times, and each will have a separate response chain. However the
84     # Retrieve object will remain tied to a specific version of the file, and
85     # will use a single ServerMap instance.
86     implements(IPushProducer)
87
88     def __init__(self, filenode, servermap, verinfo, fetch_privkey=False,
89                  verify=False):
90         self._node = filenode
91         assert self._node.get_pubkey()
92         self._storage_index = filenode.get_storage_index()
93         assert self._node.get_readkey()
94         self._last_failure = None
95         prefix = si_b2a(self._storage_index)[:5]
96         self._log_number = log.msg("Retrieve(%s): starting" % prefix)
97         self._outstanding_queries = {} # maps (peerid,shnum) to start_time
98         self._running = True
99         self._decoding = False
100         self._bad_shares = set()
101
102         self.servermap = servermap
103         assert self._node.get_pubkey()
104         self.verinfo = verinfo
105         # during repair, we may be called upon to grab the private key, since
106         # it wasn't picked up during a verify=False checker run, and we'll
107         # need it for repair to generate a new version.
108         self._need_privkey = verify or (fetch_privkey
109                                         and not self._node.get_privkey())
110
111         if self._need_privkey:
112             # TODO: Evaluate the need for this. We'll use it if we want
113             # to limit how many queries are on the wire for the privkey
114             # at once.
115             self._privkey_query_markers = [] # one Marker for each time we've
116                                              # tried to get the privkey.
117
118         # verify means that we are using the downloader logic to verify all
119         # of our shares. This tells the downloader a few things.
120         # 
121         # 1. We need to download all of the shares.
122         # 2. We don't need to decode or decrypt the shares, since our
123         #    caller doesn't care about the plaintext, only the
124         #    information about which shares are or are not valid.
125         # 3. When we are validating readers, we need to validate the
126         #    signature on the prefix. Do we? We already do this in the
127         #    servermap update?
128         self._verify = verify
129
130         self._status = RetrieveStatus()
131         self._status.set_storage_index(self._storage_index)
132         self._status.set_helper(False)
133         self._status.set_progress(0.0)
134         self._status.set_active(True)
135         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
136          offsets_tuple) = self.verinfo
137         self._status.set_size(datalength)
138         self._status.set_encoding(k, N)
139         self.readers = {}
140         self._pause_deferred = None
141         self._offset = None
142         self._read_length = None
143         self.log("got seqnum %d" % self.verinfo[0])
144
145
146     def get_status(self):
147         return self._status
148
149     def log(self, *args, **kwargs):
150         if "parent" not in kwargs:
151             kwargs["parent"] = self._log_number
152         if "facility" not in kwargs:
153             kwargs["facility"] = "tahoe.mutable.retrieve"
154         return log.msg(*args, **kwargs)
155
156
157     ###################
158     # IPushProducer
159
160     def pauseProducing(self):
161         """
162         I am called by my download target if we have produced too much
163         data for it to handle. I make the downloader stop producing new
164         data until my resumeProducing method is called.
165         """
166         if self._pause_deferred is not None:
167             return
168
169         # fired when the download is unpaused. 
170         self._old_status = self._status.get_status()
171         self._status.set_status("Paused")
172
173         self._pause_deferred = defer.Deferred()
174
175
176     def resumeProducing(self):
177         """
178         I am called by my download target once it is ready to begin
179         receiving data again.
180         """
181         if self._pause_deferred is None:
182             return
183
184         p = self._pause_deferred
185         self._pause_deferred = None
186         self._status.set_status(self._old_status)
187
188         eventually(p.callback, None)
189
190
191     def _check_for_paused(self, res):
192         """
193         I am called just before a write to the consumer. I return a
194         Deferred that eventually fires with the data that is to be
195         written to the consumer. If the download has not been paused,
196         the Deferred fires immediately. Otherwise, the Deferred fires
197         when the downloader is unpaused.
198         """
199         if self._pause_deferred is not None:
200             d = defer.Deferred()
201             self._pause_deferred.addCallback(lambda ignored: d.callback(res))
202             return d
203         return defer.succeed(res)
204
205
206     def download(self, consumer=None, offset=0, size=None):
207         assert IConsumer.providedBy(consumer) or self._verify
208
209         if consumer:
210             self._consumer = consumer
211             # we provide IPushProducer, so streaming=True, per
212             # IConsumer.
213             self._consumer.registerProducer(self, streaming=True)
214
215         self._done_deferred = defer.Deferred()
216         self._started = time.time()
217         self._status.set_status("Retrieving Shares")
218
219         self._offset = offset
220         self._read_length = size
221
222         # first, which servers can we use?
223         versionmap = self.servermap.make_versionmap()
224         shares = versionmap[self.verinfo]
225         # this sharemap is consumed as we decide to send requests
226         self.remaining_sharemap = DictOfSets()
227         for (shnum, peerid, timestamp) in shares:
228             self.remaining_sharemap.add(shnum, peerid)
229             # If the servermap update fetched anything, it fetched at least 1
230             # KiB, so we ask for that much.
231             # TODO: Change the cache methods to allow us to fetch all of the
232             # data that they have, then change this method to do that.
233             any_cache = self._node._read_from_cache(self.verinfo, shnum,
234                                                     0, 1000)
235             ss = self.servermap.connections[peerid]
236             reader = MDMFSlotReadProxy(ss,
237                                        self._storage_index,
238                                        shnum,
239                                        any_cache)
240             reader.peerid = peerid
241             self.readers[shnum] = reader
242
243
244         self.shares = {} # maps shnum to validated blocks
245         self._active_readers = [] # list of active readers for this dl.
246         self._validated_readers = set() # set of readers that we have
247                                         # validated the prefix of
248         self._block_hash_trees = {} # shnum => hashtree
249
250         # how many shares do we need?
251         (seqnum,
252          root_hash,
253          IV,
254          segsize,
255          datalength,
256          k,
257          N,
258          prefix,
259          offsets_tuple) = self.verinfo
260
261
262         # We need one share hash tree for the entire file; its leaves
263         # are the roots of the block hash trees for the shares that
264         # comprise it, and its root is in the verinfo.
265         self.share_hash_tree = hashtree.IncompleteHashTree(N)
266         self.share_hash_tree.set_hashes({0: root_hash})
267
268         # This will set up both the segment decoder and the tail segment
269         # decoder, as well as a variety of other instance variables that
270         # the download process will use.
271         self._setup_encoding_parameters()
272         assert len(self.remaining_sharemap) >= k
273
274         self.log("starting download")
275         self._started_fetching = time.time()
276
277         self._add_active_peers()
278         # The download process beyond this is a state machine.
279         # _add_active_peers will select the peers that we want to use
280         # for the download, and then attempt to start downloading. After
281         # each segment, it will check for doneness, reacting to broken
282         # peers and corrupt shares as necessary. If it runs out of good
283         # peers before downloading all of the segments, _done_deferred
284         # will errback.  Otherwise, it will eventually callback with the
285         # contents of the mutable file.
286         return self._done_deferred
287
288
289     def decode(self, blocks_and_salts, segnum):
290         """
291         I am a helper method that the mutable file update process uses
292         as a shortcut to decode and decrypt the segments that it needs
293         to fetch in order to perform a file update. I take in a
294         collection of blocks and salts, and pick some of those to make a
295         segment with. I return the plaintext associated with that
296         segment.
297         """
298         # shnum => block hash tree. Unused, but setup_encoding_parameters will
299         # want to set this.
300         # XXX: Make it so that it won't set this if we're just decoding.
301         self._block_hash_trees = None
302         self._setup_encoding_parameters()
303         # This is the form expected by decode.
304         blocks_and_salts = blocks_and_salts.items()
305         blocks_and_salts = [(True, [d]) for d in blocks_and_salts]
306
307         d = self._decode_blocks(blocks_and_salts, segnum)
308         d.addCallback(self._decrypt_segment)
309         return d
310
311
312     def _setup_encoding_parameters(self):
313         """
314         I set up the encoding parameters, including k, n, the number
315         of segments associated with this file, and the segment decoder.
316         """
317         (seqnum,
318          root_hash,
319          IV,
320          segsize,
321          datalength,
322          k,
323          n,
324          known_prefix,
325          offsets_tuple) = self.verinfo
326         self._required_shares = k
327         self._total_shares = n
328         self._segment_size = segsize
329         self._data_length = datalength
330
331         if not IV:
332             self._version = MDMF_VERSION
333         else:
334             self._version = SDMF_VERSION
335
336         if datalength and segsize:
337             self._num_segments = mathutil.div_ceil(datalength, segsize)
338             self._tail_data_size = datalength % segsize
339         else:
340             self._num_segments = 0
341             self._tail_data_size = 0
342
343         self._segment_decoder = codec.CRSDecoder()
344         self._segment_decoder.set_params(segsize, k, n)
345
346         if  not self._tail_data_size:
347             self._tail_data_size = segsize
348
349         self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
350                                                          self._required_shares)
351         if self._tail_segment_size == self._segment_size:
352             self._tail_decoder = self._segment_decoder
353         else:
354             self._tail_decoder = codec.CRSDecoder()
355             self._tail_decoder.set_params(self._tail_segment_size,
356                                           self._required_shares,
357                                           self._total_shares)
358
359         self.log("got encoding parameters: "
360                  "k: %d "
361                  "n: %d "
362                  "%d segments of %d bytes each (%d byte tail segment)" % \
363                  (k, n, self._num_segments, self._segment_size,
364                   self._tail_segment_size))
365
366         if self._block_hash_trees is not None:
367             for i in xrange(self._total_shares):
368                 # So we don't have to do this later.
369                 self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
370
371         # Our last task is to tell the downloader where to start and
372         # where to stop. We use three parameters for that:
373         #   - self._start_segment: the segment that we need to start
374         #     downloading from. 
375         #   - self._current_segment: the next segment that we need to
376         #     download.
377         #   - self._last_segment: The last segment that we were asked to
378         #     download.
379         #
380         #  We say that the download is complete when
381         #  self._current_segment > self._last_segment. We use
382         #  self._start_segment and self._last_segment to know when to
383         #  strip things off of segments, and how much to strip.
384         if self._offset:
385             self.log("got offset: %d" % self._offset)
386             # our start segment is the first segment containing the
387             # offset we were given. 
388             start = mathutil.div_ceil(self._offset,
389                                       self._segment_size)
390             # this gets us the first segment after self._offset. Then
391             # our start segment is the one before it.
392             start -= 1
393
394             assert start < self._num_segments
395             self._start_segment = start
396             self.log("got start segment: %d" % self._start_segment)
397         else:
398             self._start_segment = 0
399
400
401         if self._read_length:
402             # our end segment is the last segment containing part of the
403             # segment that we were asked to read.
404             self.log("got read length %d" % self._read_length)
405             end_data = self._offset + self._read_length
406             end = mathutil.div_ceil(end_data,
407                                     self._segment_size)
408             end -= 1
409             assert end < self._num_segments
410             self._last_segment = end
411             self.log("got end segment: %d" % self._last_segment)
412         else:
413             self._last_segment = self._num_segments - 1
414
415         self._current_segment = self._start_segment
416
417     def _add_active_peers(self):
418         """
419         I populate self._active_readers with enough active readers to
420         retrieve the contents of this mutable file. I am called before
421         downloading starts, and (eventually) after each validation
422         error, connection error, or other problem in the download.
423         """
424         # TODO: It would be cool to investigate other heuristics for
425         # reader selection. For instance, the cost (in time the user
426         # spends waiting for their file) of selecting a really slow peer
427         # that happens to have a primary share is probably more than
428         # selecting a really fast peer that doesn't have a primary
429         # share. Maybe the servermap could be extended to provide this
430         # information; it could keep track of latency information while
431         # it gathers more important data, and then this routine could
432         # use that to select active readers.
433         #
434         # (these and other questions would be easier to answer with a
435         #  robust, configurable tahoe-lafs simulator, which modeled node
436         #  failures, differences in node speed, and other characteristics
437         #  that we expect storage servers to have.  You could have
438         #  presets for really stable grids (like allmydata.com),
439         #  friendnets, make it easy to configure your own settings, and
440         #  then simulate the effect of big changes on these use cases
441         #  instead of just reasoning about what the effect might be. Out
442         #  of scope for MDMF, though.)
443
444         # We need at least self._required_shares readers to download a
445         # segment.
446         if self._verify:
447             needed = self._total_shares
448         else:
449             needed = self._required_shares - len(self._active_readers)
450         # XXX: Why don't format= log messages work here?
451         self.log("adding %d peers to the active peers list" % needed)
452
453         # We favor lower numbered shares, since FEC is faster with
454         # primary shares than with other shares, and lower-numbered
455         # shares are more likely to be primary than higher numbered
456         # shares.
457         active_shnums = set(sorted(self.remaining_sharemap.keys()))
458         # We shouldn't consider adding shares that we already have; this
459         # will cause problems later.
460         active_shnums -= set([reader.shnum for reader in self._active_readers])
461         active_shnums = list(active_shnums)[:needed]
462         if len(active_shnums) < needed and not self._verify:
463             # We don't have enough readers to retrieve the file; fail.
464             return self._failed()
465
466         for shnum in active_shnums:
467             self._active_readers.append(self.readers[shnum])
468             self.log("added reader for share %d" % shnum)
469         assert len(self._active_readers) >= self._required_shares
470         # Conceptually, this is part of the _add_active_peers step. It
471         # validates the prefixes of newly added readers to make sure
472         # that they match what we are expecting for self.verinfo. If
473         # validation is successful, _validate_active_prefixes will call
474         # _download_current_segment for us. If validation is
475         # unsuccessful, then _validate_prefixes will remove the peer and
476         # call _add_active_peers again, where we will attempt to rectify
477         # the problem by choosing another peer.
478         return self._validate_active_prefixes()
479
480
481     def _validate_active_prefixes(self):
482         """
483         I check to make sure that the prefixes on the peers that I am
484         currently reading from match the prefix that we want to see, as
485         said in self.verinfo.
486
487         If I find that all of the active peers have acceptable prefixes,
488         I pass control to _download_current_segment, which will use
489         those peers to do cool things. If I find that some of the active
490         peers have unacceptable prefixes, I will remove them from active
491         peers (and from further consideration) and call
492         _add_active_peers to attempt to rectify the situation. I keep
493         track of which peers I have already validated so that I don't
494         need to do so again.
495         """
496         assert self._active_readers, "No more active readers"
497
498         ds = []
499         new_readers = set(self._active_readers) - self._validated_readers
500         self.log('validating %d newly-added active readers' % len(new_readers))
501
502         for reader in new_readers:
503             # We force a remote read here -- otherwise, we are relying
504             # on cached data that we already verified as valid, and we
505             # won't detect an uncoordinated write that has occurred
506             # since the last servermap update.
507             d = reader.get_prefix(force_remote=True)
508             d.addCallback(self._try_to_validate_prefix, reader)
509             ds.append(d)
510         dl = defer.DeferredList(ds, consumeErrors=True)
511         def _check_results(results):
512             # Each result in results will be of the form (success, msg).
513             # We don't care about msg, but success will tell us whether
514             # or not the checkstring validated. If it didn't, we need to
515             # remove the offending (peer,share) from our active readers,
516             # and ensure that active readers is again populated.
517             bad_readers = []
518             for i, result in enumerate(results):
519                 if not result[0]:
520                     reader = self._active_readers[i]
521                     f = result[1]
522                     assert isinstance(f, failure.Failure)
523
524                     self.log("The reader %s failed to "
525                              "properly validate: %s" % \
526                              (reader, str(f.value)))
527                     bad_readers.append((reader, f))
528                 else:
529                     reader = self._active_readers[i]
530                     self.log("the reader %s checks out, so we'll use it" % \
531                              reader)
532                     self._validated_readers.add(reader)
533                     # Each time we validate a reader, we check to see if
534                     # we need the private key. If we do, we politely ask
535                     # for it and then continue computing. If we find
536                     # that we haven't gotten it at the end of
537                     # segment decoding, then we'll take more drastic
538                     # measures.
539                     if self._need_privkey and not self._node.is_readonly():
540                         d = reader.get_encprivkey()
541                         d.addCallback(self._try_to_validate_privkey, reader)
542             if bad_readers:
543                 # We do them all at once, or else we screw up list indexing.
544                 for (reader, f) in bad_readers:
545                     self._mark_bad_share(reader, f)
546                 if self._verify:
547                     if len(self._active_readers) >= self._required_shares:
548                         return self._download_current_segment()
549                     else:
550                         return self._failed()
551                 else:
552                     return self._add_active_peers()
553             else:
554                 return self._download_current_segment()
555             # The next step will assert that it has enough active
556             # readers to fetch shares; we just need to remove it.
557         dl.addCallback(_check_results)
558         return dl
559
560
561     def _try_to_validate_prefix(self, prefix, reader):
562         """
563         I check that the prefix returned by a candidate server for
564         retrieval matches the prefix that the servermap knows about
565         (and, hence, the prefix that was validated earlier). If it does,
566         I return True, which means that I approve of the use of the
567         candidate server for segment retrieval. If it doesn't, I return
568         False, which means that another server must be chosen.
569         """
570         (seqnum,
571          root_hash,
572          IV,
573          segsize,
574          datalength,
575          k,
576          N,
577          known_prefix,
578          offsets_tuple) = self.verinfo
579         if known_prefix != prefix:
580             self.log("prefix from share %d doesn't match" % reader.shnum)
581             raise UncoordinatedWriteError("Mismatched prefix -- this could "
582                                           "indicate an uncoordinated write")
583         # Otherwise, we're okay -- no issues.
584
585
586     def _remove_reader(self, reader):
587         """
588         At various points, we will wish to remove a peer from
589         consideration and/or use. These include, but are not necessarily
590         limited to:
591
592             - A connection error.
593             - A mismatched prefix (that is, a prefix that does not match
594               our conception of the version information string).
595             - A failing block hash, salt hash, or share hash, which can
596               indicate disk failure/bit flips, or network trouble.
597
598         This method will do that. I will make sure that the
599         (shnum,reader) combination represented by my reader argument is
600         not used for anything else during this download. I will not
601         advise the reader of any corruption, something that my callers
602         may wish to do on their own.
603         """
604         # TODO: When you're done writing this, see if this is ever
605         # actually used for something that _mark_bad_share isn't. I have
606         # a feeling that they will be used for very similar things, and
607         # that having them both here is just going to be an epic amount
608         # of code duplication.
609         #
610         # (well, okay, not epic, but meaningful)
611         self.log("removing reader %s" % reader)
612         # Remove the reader from _active_readers
613         self._active_readers.remove(reader)
614         # TODO: self.readers.remove(reader)?
615         for shnum in list(self.remaining_sharemap.keys()):
616             self.remaining_sharemap.discard(shnum, reader.peerid)
617
618
619     def _mark_bad_share(self, reader, f):
620         """
621         I mark the (peerid, shnum) encapsulated by my reader argument as
622         a bad share, which means that it will not be used anywhere else.
623
624         There are several reasons to want to mark something as a bad
625         share. These include:
626
627             - A connection error to the peer.
628             - A mismatched prefix (that is, a prefix that does not match
629               our local conception of the version information string).
630             - A failing block hash, salt hash, share hash, or other
631               integrity check.
632
633         This method will ensure that readers that we wish to mark bad
634         (for these reasons or other reasons) are not used for the rest
635         of the download. Additionally, it will attempt to tell the
636         remote peer (with no guarantee of success) that its share is
637         corrupt.
638         """
639         self.log("marking share %d on server %s as bad" % \
640                  (reader.shnum, reader))
641         prefix = self.verinfo[-2]
642         self.servermap.mark_bad_share(reader.peerid,
643                                       reader.shnum,
644                                       prefix)
645         self._remove_reader(reader)
646         self._bad_shares.add((reader.peerid, reader.shnum, f))
647         self._status.problems[reader.peerid] = f
648         self._last_failure = f
649         self.notify_server_corruption(reader.peerid, reader.shnum,
650                                       str(f.value))
651
652
653     def _download_current_segment(self):
654         """
655         I download, validate, decode, decrypt, and assemble the segment
656         that this Retrieve is currently responsible for downloading.
657         """
658         assert len(self._active_readers) >= self._required_shares
659         if self._current_segment <= self._last_segment:
660             d = self._process_segment(self._current_segment)
661         else:
662             d = defer.succeed(None)
663         d.addBoth(self._turn_barrier)
664         d.addCallback(self._check_for_done)
665         return d
666
667
668     def _turn_barrier(self, result):
669         """
670         I help the download process avoid the recursion limit issues
671         discussed in #237.
672         """
673         return fireEventually(result)
674
675
676     def _process_segment(self, segnum):
677         """
678         I download, validate, decode, and decrypt one segment of the
679         file that this Retrieve is retrieving. This means coordinating
680         the process of getting k blocks of that file, validating them,
681         assembling them into one segment with the decoder, and then
682         decrypting them.
683         """
684         self.log("processing segment %d" % segnum)
685
686         # TODO: The old code uses a marker. Should this code do that
687         # too? What did the Marker do?
688         assert len(self._active_readers) >= self._required_shares
689
690         # We need to ask each of our active readers for its block and
691         # salt. We will then validate those. If validation is
692         # successful, we will assemble the results into plaintext.
693         ds = []
694         for reader in self._active_readers:
695             started = time.time()
696             d = reader.get_block_and_salt(segnum, queue=True)
697             d2 = self._get_needed_hashes(reader, segnum)
698             dl = defer.DeferredList([d, d2], consumeErrors=True)
699             dl.addCallback(self._validate_block, segnum, reader, started)
700             dl.addErrback(self._validation_or_decoding_failed, [reader])
701             ds.append(dl)
702             reader.flush()
703         dl = defer.DeferredList(ds)
704         if self._verify:
705             dl.addCallback(lambda ignored: "")
706             dl.addCallback(self._set_segment)
707         else:
708             dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
709         return dl
710
711
712     def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum):
713         """
714         I take the results of fetching and validating the blocks from a
715         callback chain in another method. If the results are such that
716         they tell me that validation and fetching succeeded without
717         incident, I will proceed with decoding and decryption.
718         Otherwise, I will do nothing.
719         """
720         self.log("trying to decode and decrypt segment %d" % segnum)
721         failures = False
722         for block_and_salt in blocks_and_salts:
723             if not block_and_salt[0] or block_and_salt[1] == None:
724                 self.log("some validation operations failed; not proceeding")
725                 failures = True
726                 break
727         if not failures:
728             self.log("everything looks ok, building segment %d" % segnum)
729             d = self._decode_blocks(blocks_and_salts, segnum)
730             d.addCallback(self._decrypt_segment)
731             d.addErrback(self._validation_or_decoding_failed,
732                          self._active_readers)
733             # check to see whether we've been paused before writing
734             # anything.
735             d.addCallback(self._check_for_paused)
736             d.addCallback(self._set_segment)
737             return d
738         else:
739             return defer.succeed(None)
740
741
742     def _set_segment(self, segment):
743         """
744         Given a plaintext segment, I register that segment with the
745         target that is handling the file download.
746         """
747         self.log("got plaintext for segment %d" % self._current_segment)
748         if self._current_segment == self._start_segment:
749             # We're on the first segment. It's possible that we want
750             # only some part of the end of this segment, and that we
751             # just downloaded the whole thing to get that part. If so,
752             # we need to account for that and give the reader just the
753             # data that they want.
754             n = self._offset % self._segment_size
755             self.log("stripping %d bytes off of the first segment" % n)
756             self.log("original segment length: %d" % len(segment))
757             segment = segment[n:]
758             self.log("new segment length: %d" % len(segment))
759
760         if self._current_segment == self._last_segment and self._read_length is not None:
761             # We're on the last segment. It's possible that we only want
762             # part of the beginning of this segment, and that we
763             # downloaded the whole thing anyway. Make sure to give the
764             # caller only the portion of the segment that they want to
765             # receive.
766             extra = self._read_length
767             if self._start_segment != self._last_segment:
768                 extra -= self._segment_size - \
769                             (self._offset % self._segment_size)
770             extra %= self._segment_size
771             self.log("original segment length: %d" % len(segment))
772             segment = segment[:extra]
773             self.log("new segment length: %d" % len(segment))
774             self.log("only taking %d bytes of the last segment" % extra)
775
776         if not self._verify:
777             self._consumer.write(segment)
778         else:
779             # we don't care about the plaintext if we are doing a verify.
780             segment = None
781         self._current_segment += 1
782
783
784     def _validation_or_decoding_failed(self, f, readers):
785         """
786         I am called when a block or a salt fails to correctly validate, or when
787         the decryption or decoding operation fails for some reason.  I react to
788         this failure by notifying the remote server of corruption, and then
789         removing the remote peer from further activity.
790         """
791         assert isinstance(readers, list)
792         bad_shnums = [reader.shnum for reader in readers]
793
794         self.log("validation or decoding failed on share(s) %s, peer(s) %s "
795                  ", segment %d: %s" % \
796                  (bad_shnums, readers, self._current_segment, str(f)))
797         for reader in readers:
798             self._mark_bad_share(reader, f)
799         return
800
801
802     def _validate_block(self, results, segnum, reader, started):
803         """
804         I validate a block from one share on a remote server.
805         """
806         # Grab the part of the block hash tree that is necessary to
807         # validate this block, then generate the block hash root.
808         self.log("validating share %d for segment %d" % (reader.shnum,
809                                                              segnum))
810         self._status.add_fetch_timing(reader.peerid, started)
811         self._status.set_status("Valdiating blocks for segment %d" % segnum)
812         # Did we fail to fetch either of the things that we were
813         # supposed to? Fail if so.
814         if not results[0][0] and results[1][0]:
815             # handled by the errback handler.
816
817             # These all get batched into one query, so the resulting
818             # failure should be the same for all of them, so we can just
819             # use the first one.
820             assert isinstance(results[0][1], failure.Failure)
821
822             f = results[0][1]
823             raise CorruptShareError(reader.peerid,
824                                     reader.shnum,
825                                     "Connection error: %s" % str(f))
826
827         block_and_salt, block_and_sharehashes = results
828         block, salt = block_and_salt[1]
829         blockhashes, sharehashes = block_and_sharehashes[1]
830
831         blockhashes = dict(enumerate(blockhashes[1]))
832         self.log("the reader gave me the following blockhashes: %s" % \
833                  blockhashes.keys())
834         self.log("the reader gave me the following sharehashes: %s" % \
835                  sharehashes[1].keys())
836         bht = self._block_hash_trees[reader.shnum]
837
838         if bht.needed_hashes(segnum, include_leaf=True):
839             try:
840                 bht.set_hashes(blockhashes)
841             except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
842                     IndexError), e:
843                 raise CorruptShareError(reader.peerid,
844                                         reader.shnum,
845                                         "block hash tree failure: %s" % e)
846
847         if self._version == MDMF_VERSION:
848             blockhash = hashutil.block_hash(salt + block)
849         else:
850             blockhash = hashutil.block_hash(block)
851         # If this works without an error, then validation is
852         # successful.
853         try:
854            bht.set_hashes(leaves={segnum: blockhash})
855         except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
856                 IndexError), e:
857             raise CorruptShareError(reader.peerid,
858                                     reader.shnum,
859                                     "block hash tree failure: %s" % e)
860
861         # Reaching this point means that we know that this segment
862         # is correct. Now we need to check to see whether the share
863         # hash chain is also correct. 
864         # SDMF wrote share hash chains that didn't contain the
865         # leaves, which would be produced from the block hash tree.
866         # So we need to validate the block hash tree first. If
867         # successful, then bht[0] will contain the root for the
868         # shnum, which will be a leaf in the share hash tree, which
869         # will allow us to validate the rest of the tree.
870         if self.share_hash_tree.needed_hashes(reader.shnum,
871                                               include_leaf=True) or \
872                                               self._verify:
873             try:
874                 self.share_hash_tree.set_hashes(hashes=sharehashes[1],
875                                             leaves={reader.shnum: bht[0]})
876             except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
877                     IndexError), e:
878                 raise CorruptShareError(reader.peerid,
879                                         reader.shnum,
880                                         "corrupt hashes: %s" % e)
881
882         self.log('share %d is valid for segment %d' % (reader.shnum,
883                                                        segnum))
884         return {reader.shnum: (block, salt)}
885
886
887     def _get_needed_hashes(self, reader, segnum):
888         """
889         I get the hashes needed to validate segnum from the reader, then return
890         to my caller when this is done.
891         """
892         bht = self._block_hash_trees[reader.shnum]
893         needed = bht.needed_hashes(segnum, include_leaf=True)
894         # The root of the block hash tree is also a leaf in the share
895         # hash tree. So we don't need to fetch it from the remote
896         # server. In the case of files with one segment, this means that
897         # we won't fetch any block hash tree from the remote server,
898         # since the hash of each share of the file is the entire block
899         # hash tree, and is a leaf in the share hash tree. This is fine,
900         # since any share corruption will be detected in the share hash
901         # tree.
902         #needed.discard(0)
903         self.log("getting blockhashes for segment %d, share %d: %s" % \
904                  (segnum, reader.shnum, str(needed)))
905         d1 = reader.get_blockhashes(needed, queue=True, force_remote=True)
906         if self.share_hash_tree.needed_hashes(reader.shnum):
907             need = self.share_hash_tree.needed_hashes(reader.shnum)
908             self.log("also need sharehashes for share %d: %s" % (reader.shnum,
909                                                                  str(need)))
910             d2 = reader.get_sharehashes(need, queue=True, force_remote=True)
911         else:
912             d2 = defer.succeed({}) # the logic in the next method
913                                    # expects a dict
914         dl = defer.DeferredList([d1, d2], consumeErrors=True)
915         return dl
916
917
918     def _decode_blocks(self, blocks_and_salts, segnum):
919         """
920         I take a list of k blocks and salts, and decode that into a
921         single encrypted segment.
922         """
923         d = {}
924         # We want to merge our dictionaries to the form 
925         # {shnum: blocks_and_salts}
926         #
927         # The dictionaries come from validate block that way, so we just
928         # need to merge them.
929         for block_and_salt in blocks_and_salts:
930             d.update(block_and_salt[1])
931
932         # All of these blocks should have the same salt; in SDMF, it is
933         # the file-wide IV, while in MDMF it is the per-segment salt. In
934         # either case, we just need to get one of them and use it.
935         #
936         # d.items()[0] is like (shnum, (block, salt))
937         # d.items()[0][1] is like (block, salt)
938         # d.items()[0][1][1] is the salt.
939         salt = d.items()[0][1][1]
940         # Next, extract just the blocks from the dict. We'll use the
941         # salt in the next step.
942         share_and_shareids = [(k, v[0]) for k, v in d.items()]
943         d2 = dict(share_and_shareids)
944         shareids = []
945         shares = []
946         for shareid, share in d2.items():
947             shareids.append(shareid)
948             shares.append(share)
949
950         self._status.set_status("Decoding")
951         started = time.time()
952         assert len(shareids) >= self._required_shares, len(shareids)
953         # zfec really doesn't want extra shares
954         shareids = shareids[:self._required_shares]
955         shares = shares[:self._required_shares]
956         self.log("decoding segment %d" % segnum)
957         if segnum == self._num_segments - 1:
958             d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
959         else:
960             d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
961         def _process(buffers):
962             segment = "".join(buffers)
963             self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
964                      segnum=segnum,
965                      numsegs=self._num_segments,
966                      level=log.NOISY)
967             self.log(" joined length %d, datalength %d" %
968                      (len(segment), self._data_length))
969             if segnum == self._num_segments - 1:
970                 size_to_use = self._tail_data_size
971             else:
972                 size_to_use = self._segment_size
973             segment = segment[:size_to_use]
974             self.log(" segment len=%d" % len(segment))
975             self._status.timings.setdefault("decode", 0)
976             self._status.timings['decode'] = time.time() - started
977             return segment, salt
978         d.addCallback(_process)
979         return d
980
981
982     def _decrypt_segment(self, segment_and_salt):
983         """
984         I take a single segment and its salt, and decrypt it. I return
985         the plaintext of the segment that is in my argument.
986         """
987         segment, salt = segment_and_salt
988         self._status.set_status("decrypting")
989         self.log("decrypting segment %d" % self._current_segment)
990         started = time.time()
991         key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
992         decryptor = AES(key)
993         plaintext = decryptor.process(segment)
994         self._status.timings.setdefault("decrypt", 0)
995         self._status.timings['decrypt'] = time.time() - started
996         return plaintext
997
998
999     def notify_server_corruption(self, peerid, shnum, reason):
1000         ss = self.servermap.connections[peerid]
1001         ss.callRemoteOnly("advise_corrupt_share",
1002                           "mutable", self._storage_index, shnum, reason)
1003
1004
1005     def _try_to_validate_privkey(self, enc_privkey, reader):
1006         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
1007         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
1008         if alleged_writekey != self._node.get_writekey():
1009             self.log("invalid privkey from %s shnum %d" %
1010                      (reader, reader.shnum),
1011                      level=log.WEIRD, umid="YIw4tA")
1012             if self._verify:
1013                 self.servermap.mark_bad_share(reader.peerid, reader.shnum,
1014                                               self.verinfo[-2])
1015                 e = CorruptShareError(reader.peerid,
1016                                       reader.shnum,
1017                                       "invalid privkey")
1018                 f = failure.Failure(e)
1019                 self._bad_shares.add((reader.peerid, reader.shnum, f))
1020             return
1021
1022         # it's good
1023         self.log("got valid privkey from shnum %d on reader %s" %
1024                  (reader.shnum, reader))
1025         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
1026         self._node._populate_encprivkey(enc_privkey)
1027         self._node._populate_privkey(privkey)
1028         self._need_privkey = False
1029
1030
1031     def _check_for_done(self, res):
1032         """
1033         I check to see if this Retrieve object has successfully finished
1034         its work.
1035
1036         I can exit in the following ways:
1037             - If there are no more segments to download, then I exit by
1038               causing self._done_deferred to fire with the plaintext
1039               content requested by the caller.
1040             - If there are still segments to be downloaded, and there
1041               are enough active readers (readers which have not broken
1042               and have not given us corrupt data) to continue
1043               downloading, I send control back to
1044               _download_current_segment.
1045             - If there are still segments to be downloaded but there are
1046               not enough active peers to download them, I ask
1047               _add_active_peers to add more peers. If it is successful,
1048               it will call _download_current_segment. If there are not
1049               enough peers to retrieve the file, then that will cause
1050               _done_deferred to errback.
1051         """
1052         self.log("checking for doneness")
1053         if self._current_segment > self._last_segment:
1054             # No more segments to download, we're done.
1055             self.log("got plaintext, done")
1056             return self._done()
1057
1058         if len(self._active_readers) >= self._required_shares:
1059             # More segments to download, but we have enough good peers
1060             # in self._active_readers that we can do that without issue,
1061             # so go nab the next segment.
1062             self.log("not done yet: on segment %d of %d" % \
1063                      (self._current_segment + 1, self._num_segments))
1064             return self._download_current_segment()
1065
1066         self.log("not done yet: on segment %d of %d, need to add peers" % \
1067                  (self._current_segment + 1, self._num_segments))
1068         return self._add_active_peers()
1069
1070
1071     def _done(self):
1072         """
1073         I am called by _check_for_done when the download process has
1074         finished successfully. After making some useful logging
1075         statements, I return the decrypted contents to the owner of this
1076         Retrieve object through self._done_deferred.
1077         """
1078         self._running = False
1079         self._status.set_active(False)
1080         now = time.time()
1081         self._status.timings['total'] = now - self._started
1082         self._status.timings['fetch'] = now - self._started_fetching
1083
1084         if self._verify:
1085             ret = list(self._bad_shares)
1086             self.log("done verifying, found %d bad shares" % len(ret))
1087         else:
1088             # TODO: upload status here?
1089             ret = self._consumer
1090             self._consumer.unregisterProducer()
1091         eventually(self._done_deferred.callback, ret)
1092
1093
1094     def _failed(self):
1095         """
1096         I am called by _add_active_peers when there are not enough
1097         active peers left to complete the download. After making some
1098         useful logging statements, I return an exception to that effect
1099         to the caller of this Retrieve object through
1100         self._done_deferred.
1101         """
1102         self._running = False
1103         self._status.set_active(False)
1104         now = time.time()
1105         self._status.timings['total'] = now - self._started
1106         self._status.timings['fetch'] = now - self._started_fetching
1107
1108         if self._verify:
1109             ret = list(self._bad_shares)
1110         else:
1111             format = ("ran out of peers: "
1112                       "have %(have)d of %(total)d segments "
1113                       "found %(bad)d bad shares "
1114                       "encoding %(k)d-of-%(n)d")
1115             args = {"have": self._current_segment,
1116                     "total": self._num_segments,
1117                     "need": self._last_segment,
1118                     "k": self._required_shares,
1119                     "n": self._total_shares,
1120                     "bad": len(self._bad_shares)}
1121             e = NotEnoughSharesError("%s, last failure: %s" % \
1122                                      (format % args, str(self._last_failure)))
1123             f = failure.Failure(e)
1124             ret = f
1125         eventually(self._done_deferred.callback, ret)