]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/retrieve.py
f6c99d0ff10224fb1b956bc7791aca5fc9bf7d3a
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / retrieve.py
1
2 import time
3 from itertools import count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from twisted.internet.interfaces import IPushProducer, IConsumer
8 from foolscap.api import eventually, fireEventually
9 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
10                                  MDMF_VERSION, SDMF_VERSION
11 from allmydata.util import hashutil, log, mathutil
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from pycryptopp.publickey import rsa
17
18 from allmydata.mutable.common import CorruptShareError, UncoordinatedWriteError
19 from allmydata.mutable.layout import MDMFSlotReadProxy
20
21 class RetrieveStatus:
22     implements(IRetrieveStatus)
23     statusid_counter = count(0)
24     def __init__(self):
25         self.timings = {}
26         self.timings["fetch_per_server"] = {}
27         self.timings["decode"] = 0.0
28         self.timings["decrypt"] = 0.0
29         self.timings["cumulative_verify"] = 0.0
30         self.problems = {}
31         self.active = True
32         self.storage_index = None
33         self.helper = False
34         self.encoding = ("?","?")
35         self.size = None
36         self.status = "Not started"
37         self.progress = 0.0
38         self.counter = self.statusid_counter.next()
39         self.started = time.time()
40
41     def get_started(self):
42         return self.started
43     def get_storage_index(self):
44         return self.storage_index
45     def get_encoding(self):
46         return self.encoding
47     def using_helper(self):
48         return self.helper
49     def get_size(self):
50         return self.size
51     def get_status(self):
52         return self.status
53     def get_progress(self):
54         return self.progress
55     def get_active(self):
56         return self.active
57     def get_counter(self):
58         return self.counter
59
60     def add_fetch_timing(self, peerid, elapsed):
61         if peerid not in self.timings["fetch_per_server"]:
62             self.timings["fetch_per_server"][peerid] = []
63         self.timings["fetch_per_server"][peerid].append(elapsed)
64     def accumulate_decode_time(self, elapsed):
65         self.timings["decode"] += elapsed
66     def accumulate_decrypt_time(self, elapsed):
67         self.timings["decrypt"] += elapsed
68     def set_storage_index(self, si):
69         self.storage_index = si
70     def set_helper(self, helper):
71         self.helper = helper
72     def set_encoding(self, k, n):
73         self.encoding = (k, n)
74     def set_size(self, size):
75         self.size = size
76     def set_status(self, status):
77         self.status = status
78     def set_progress(self, value):
79         self.progress = value
80     def set_active(self, value):
81         self.active = value
82
83 class Marker:
84     pass
85
86 class Retrieve:
87     # this class is currently single-use. Eventually (in MDMF) we will make
88     # it multi-use, in which case you can call download(range) multiple
89     # times, and each will have a separate response chain. However the
90     # Retrieve object will remain tied to a specific version of the file, and
91     # will use a single ServerMap instance.
92     implements(IPushProducer)
93
94     def __init__(self, filenode, servermap, verinfo, fetch_privkey=False,
95                  verify=False):
96         self._node = filenode
97         assert self._node.get_pubkey()
98         self._storage_index = filenode.get_storage_index()
99         assert self._node.get_readkey()
100         self._last_failure = None
101         prefix = si_b2a(self._storage_index)[:5]
102         self._log_number = log.msg("Retrieve(%s): starting" % prefix)
103         self._outstanding_queries = {} # maps (peerid,shnum) to start_time
104         self._running = True
105         self._decoding = False
106         self._bad_shares = set()
107
108         self.servermap = servermap
109         assert self._node.get_pubkey()
110         self.verinfo = verinfo
111         # during repair, we may be called upon to grab the private key, since
112         # it wasn't picked up during a verify=False checker run, and we'll
113         # need it for repair to generate a new version.
114         self._need_privkey = verify or (fetch_privkey
115                                         and not self._node.get_privkey())
116
117         if self._need_privkey:
118             # TODO: Evaluate the need for this. We'll use it if we want
119             # to limit how many queries are on the wire for the privkey
120             # at once.
121             self._privkey_query_markers = [] # one Marker for each time we've
122                                              # tried to get the privkey.
123
124         # verify means that we are using the downloader logic to verify all
125         # of our shares. This tells the downloader a few things.
126         # 
127         # 1. We need to download all of the shares.
128         # 2. We don't need to decode or decrypt the shares, since our
129         #    caller doesn't care about the plaintext, only the
130         #    information about which shares are or are not valid.
131         # 3. When we are validating readers, we need to validate the
132         #    signature on the prefix. Do we? We already do this in the
133         #    servermap update?
134         self._verify = verify
135
136         self._status = RetrieveStatus()
137         self._status.set_storage_index(self._storage_index)
138         self._status.set_helper(False)
139         self._status.set_progress(0.0)
140         self._status.set_active(True)
141         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
142          offsets_tuple) = self.verinfo
143         self._status.set_size(datalength)
144         self._status.set_encoding(k, N)
145         self.readers = {}
146         self._pause_deferred = None
147         self._offset = None
148         self._read_length = None
149         self.log("got seqnum %d" % self.verinfo[0])
150
151
152     def get_status(self):
153         return self._status
154
155     def log(self, *args, **kwargs):
156         if "parent" not in kwargs:
157             kwargs["parent"] = self._log_number
158         if "facility" not in kwargs:
159             kwargs["facility"] = "tahoe.mutable.retrieve"
160         return log.msg(*args, **kwargs)
161
162     def _set_current_status(self, state):
163         seg = "%d/%d" % (self._current_segment, self._last_segment)
164         self._status.set_status("segment %s (%s)" % (seg, state))
165
166     ###################
167     # IPushProducer
168
169     def pauseProducing(self):
170         """
171         I am called by my download target if we have produced too much
172         data for it to handle. I make the downloader stop producing new
173         data until my resumeProducing method is called.
174         """
175         if self._pause_deferred is not None:
176             return
177
178         # fired when the download is unpaused. 
179         self._old_status = self._status.get_status()
180         self._set_current_status("paused")
181
182         self._pause_deferred = defer.Deferred()
183
184
185     def resumeProducing(self):
186         """
187         I am called by my download target once it is ready to begin
188         receiving data again.
189         """
190         if self._pause_deferred is None:
191             return
192
193         p = self._pause_deferred
194         self._pause_deferred = None
195         self._status.set_status(self._old_status)
196
197         eventually(p.callback, None)
198
199
200     def _check_for_paused(self, res):
201         """
202         I am called just before a write to the consumer. I return a
203         Deferred that eventually fires with the data that is to be
204         written to the consumer. If the download has not been paused,
205         the Deferred fires immediately. Otherwise, the Deferred fires
206         when the downloader is unpaused.
207         """
208         if self._pause_deferred is not None:
209             d = defer.Deferred()
210             self._pause_deferred.addCallback(lambda ignored: d.callback(res))
211             return d
212         return defer.succeed(res)
213
214
215     def download(self, consumer=None, offset=0, size=None):
216         assert IConsumer.providedBy(consumer) or self._verify
217
218         if consumer:
219             self._consumer = consumer
220             # we provide IPushProducer, so streaming=True, per
221             # IConsumer.
222             self._consumer.registerProducer(self, streaming=True)
223
224         self._done_deferred = defer.Deferred()
225         self._started = time.time()
226         self._status.set_status("Retrieving Shares")
227
228         self._offset = offset
229         self._read_length = size
230
231         # first, which servers can we use?
232         versionmap = self.servermap.make_versionmap()
233         shares = versionmap[self.verinfo]
234         # this sharemap is consumed as we decide to send requests
235         self.remaining_sharemap = DictOfSets()
236         for (shnum, peerid, timestamp) in shares:
237             self.remaining_sharemap.add(shnum, peerid)
238             # If the servermap update fetched anything, it fetched at least 1
239             # KiB, so we ask for that much.
240             # TODO: Change the cache methods to allow us to fetch all of the
241             # data that they have, then change this method to do that.
242             any_cache = self._node._read_from_cache(self.verinfo, shnum,
243                                                     0, 1000)
244             ss = self.servermap.connections[peerid]
245             reader = MDMFSlotReadProxy(ss,
246                                        self._storage_index,
247                                        shnum,
248                                        any_cache)
249             reader.peerid = peerid
250             self.readers[shnum] = reader
251
252
253         self.shares = {} # maps shnum to validated blocks
254         self._active_readers = [] # list of active readers for this dl.
255         self._validated_readers = set() # set of readers that we have
256                                         # validated the prefix of
257         self._block_hash_trees = {} # shnum => hashtree
258
259         # how many shares do we need?
260         (seqnum,
261          root_hash,
262          IV,
263          segsize,
264          datalength,
265          k,
266          N,
267          prefix,
268          offsets_tuple) = self.verinfo
269
270
271         # We need one share hash tree for the entire file; its leaves
272         # are the roots of the block hash trees for the shares that
273         # comprise it, and its root is in the verinfo.
274         self.share_hash_tree = hashtree.IncompleteHashTree(N)
275         self.share_hash_tree.set_hashes({0: root_hash})
276
277         # This will set up both the segment decoder and the tail segment
278         # decoder, as well as a variety of other instance variables that
279         # the download process will use.
280         self._setup_encoding_parameters()
281         assert len(self.remaining_sharemap) >= k
282
283         self.log("starting download")
284         self._started_fetching = time.time()
285
286         self._add_active_peers()
287
288         # The download process beyond this is a state machine.
289         # _add_active_peers will select the peers that we want to use
290         # for the download, and then attempt to start downloading. After
291         # each segment, it will check for doneness, reacting to broken
292         # peers and corrupt shares as necessary. If it runs out of good
293         # peers before downloading all of the segments, _done_deferred
294         # will errback.  Otherwise, it will eventually callback with the
295         # contents of the mutable file.
296         return self._done_deferred
297
298
299     def decode(self, blocks_and_salts, segnum):
300         """
301         I am a helper method that the mutable file update process uses
302         as a shortcut to decode and decrypt the segments that it needs
303         to fetch in order to perform a file update. I take in a
304         collection of blocks and salts, and pick some of those to make a
305         segment with. I return the plaintext associated with that
306         segment.
307         """
308         # shnum => block hash tree. Unused, but setup_encoding_parameters will
309         # want to set this.
310         self._block_hash_trees = None
311         self._setup_encoding_parameters()
312
313         # This is the form expected by decode.
314         blocks_and_salts = blocks_and_salts.items()
315         blocks_and_salts = [(True, [d]) for d in blocks_and_salts]
316
317         d = self._decode_blocks(blocks_and_salts, segnum)
318         d.addCallback(self._decrypt_segment)
319         return d
320
321
322     def _setup_encoding_parameters(self):
323         """
324         I set up the encoding parameters, including k, n, the number
325         of segments associated with this file, and the segment decoder.
326         """
327         (seqnum,
328          root_hash,
329          IV,
330          segsize,
331          datalength,
332          k,
333          n,
334          known_prefix,
335          offsets_tuple) = self.verinfo
336         self._required_shares = k
337         self._total_shares = n
338         self._segment_size = segsize
339         self._data_length = datalength
340
341         if not IV:
342             self._version = MDMF_VERSION
343         else:
344             self._version = SDMF_VERSION
345
346         if datalength and segsize:
347             self._num_segments = mathutil.div_ceil(datalength, segsize)
348             self._tail_data_size = datalength % segsize
349         else:
350             self._num_segments = 0
351             self._tail_data_size = 0
352
353         self._segment_decoder = codec.CRSDecoder()
354         self._segment_decoder.set_params(segsize, k, n)
355
356         if  not self._tail_data_size:
357             self._tail_data_size = segsize
358
359         self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
360                                                          self._required_shares)
361         if self._tail_segment_size == self._segment_size:
362             self._tail_decoder = self._segment_decoder
363         else:
364             self._tail_decoder = codec.CRSDecoder()
365             self._tail_decoder.set_params(self._tail_segment_size,
366                                           self._required_shares,
367                                           self._total_shares)
368
369         self.log("got encoding parameters: "
370                  "k: %d "
371                  "n: %d "
372                  "%d segments of %d bytes each (%d byte tail segment)" % \
373                  (k, n, self._num_segments, self._segment_size,
374                   self._tail_segment_size))
375
376         if self._block_hash_trees is not None:
377             for i in xrange(self._total_shares):
378                 # So we don't have to do this later.
379                 self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
380
381         # Our last task is to tell the downloader where to start and
382         # where to stop. We use three parameters for that:
383         #   - self._start_segment: the segment that we need to start
384         #     downloading from. 
385         #   - self._current_segment: the next segment that we need to
386         #     download.
387         #   - self._last_segment: The last segment that we were asked to
388         #     download.
389         #
390         #  We say that the download is complete when
391         #  self._current_segment > self._last_segment. We use
392         #  self._start_segment and self._last_segment to know when to
393         #  strip things off of segments, and how much to strip.
394         if self._offset:
395             self.log("got offset: %d" % self._offset)
396             # our start segment is the first segment containing the
397             # offset we were given. 
398             start = self._offset // self._segment_size
399
400             assert start < self._num_segments
401             self._start_segment = start
402             self.log("got start segment: %d" % self._start_segment)
403         else:
404             self._start_segment = 0
405
406
407         # If self._read_length is None, then we want to read the whole
408         # file. Otherwise, we want to read only part of the file, and
409         # need to figure out where to stop reading.
410         if self._read_length is not None:
411             # our end segment is the last segment containing part of the
412             # segment that we were asked to read.
413             self.log("got read length %d" % self._read_length)
414             if self._read_length != 0:
415                 end_data = self._offset + self._read_length
416
417                 # We don't actually need to read the byte at end_data,
418                 # but the one before it.
419                 end = (end_data - 1) // self._segment_size
420
421                 assert end < self._num_segments
422                 self._last_segment = end
423             else:
424                 self._last_segment = self._start_segment
425             self.log("got end segment: %d" % self._last_segment)
426         else:
427             self._last_segment = self._num_segments - 1
428
429         self._current_segment = self._start_segment
430
431     def _add_active_peers(self):
432         """
433         I populate self._active_readers with enough active readers to
434         retrieve the contents of this mutable file. I am called before
435         downloading starts, and (eventually) after each validation
436         error, connection error, or other problem in the download.
437         """
438         # TODO: It would be cool to investigate other heuristics for
439         # reader selection. For instance, the cost (in time the user
440         # spends waiting for their file) of selecting a really slow peer
441         # that happens to have a primary share is probably more than
442         # selecting a really fast peer that doesn't have a primary
443         # share. Maybe the servermap could be extended to provide this
444         # information; it could keep track of latency information while
445         # it gathers more important data, and then this routine could
446         # use that to select active readers.
447         #
448         # (these and other questions would be easier to answer with a
449         #  robust, configurable tahoe-lafs simulator, which modeled node
450         #  failures, differences in node speed, and other characteristics
451         #  that we expect storage servers to have.  You could have
452         #  presets for really stable grids (like allmydata.com),
453         #  friendnets, make it easy to configure your own settings, and
454         #  then simulate the effect of big changes on these use cases
455         #  instead of just reasoning about what the effect might be. Out
456         #  of scope for MDMF, though.)
457
458         # We need at least self._required_shares readers to download a
459         # segment.
460         if self._verify:
461             needed = self._total_shares
462         else:
463             needed = self._required_shares - len(self._active_readers)
464         # XXX: Why don't format= log messages work here?
465         self.log("adding %d peers to the active peers list" % needed)
466
467         # We favor lower numbered shares, since FEC is faster with
468         # primary shares than with other shares, and lower-numbered
469         # shares are more likely to be primary than higher numbered
470         # shares.
471         active_shnums = set(sorted(self.remaining_sharemap.keys()))
472         # We shouldn't consider adding shares that we already have; this
473         # will cause problems later.
474         active_shnums -= set([reader.shnum for reader in self._active_readers])
475         active_shnums = list(active_shnums)[:needed]
476         if len(active_shnums) < needed and not self._verify:
477             # We don't have enough readers to retrieve the file; fail.
478             return self._failed()
479
480         for shnum in active_shnums:
481             self._active_readers.append(self.readers[shnum])
482             self.log("added reader for share %d" % shnum)
483         assert len(self._active_readers) >= self._required_shares
484         # Conceptually, this is part of the _add_active_peers step. It
485         # validates the prefixes of newly added readers to make sure
486         # that they match what we are expecting for self.verinfo. If
487         # validation is successful, _validate_active_prefixes will call
488         # _download_current_segment for us. If validation is
489         # unsuccessful, then _validate_prefixes will remove the peer and
490         # call _add_active_peers again, where we will attempt to rectify
491         # the problem by choosing another peer.
492         return self._validate_active_prefixes()
493
494
495     def _validate_active_prefixes(self):
496         """
497         I check to make sure that the prefixes on the peers that I am
498         currently reading from match the prefix that we want to see, as
499         said in self.verinfo.
500
501         If I find that all of the active peers have acceptable prefixes,
502         I pass control to _download_current_segment, which will use
503         those peers to do cool things. If I find that some of the active
504         peers have unacceptable prefixes, I will remove them from active
505         peers (and from further consideration) and call
506         _add_active_peers to attempt to rectify the situation. I keep
507         track of which peers I have already validated so that I don't
508         need to do so again.
509         """
510         assert self._active_readers, "No more active readers"
511
512         ds = []
513         new_readers = set(self._active_readers) - self._validated_readers
514         self.log('validating %d newly-added active readers' % len(new_readers))
515
516         for reader in new_readers:
517             # We force a remote read here -- otherwise, we are relying
518             # on cached data that we already verified as valid, and we
519             # won't detect an uncoordinated write that has occurred
520             # since the last servermap update.
521             d = reader.get_prefix(force_remote=True)
522             d.addCallback(self._try_to_validate_prefix, reader)
523             ds.append(d)
524         dl = defer.DeferredList(ds, consumeErrors=True)
525         def _check_results(results):
526             # Each result in results will be of the form (success, msg).
527             # We don't care about msg, but success will tell us whether
528             # or not the checkstring validated. If it didn't, we need to
529             # remove the offending (peer,share) from our active readers,
530             # and ensure that active readers is again populated.
531             bad_readers = []
532             for i, result in enumerate(results):
533                 if not result[0]:
534                     reader = self._active_readers[i]
535                     f = result[1]
536                     assert isinstance(f, failure.Failure)
537
538                     self.log("The reader %s failed to "
539                              "properly validate: %s" % \
540                              (reader, str(f.value)))
541                     bad_readers.append((reader, f))
542                 else:
543                     reader = self._active_readers[i]
544                     self.log("the reader %s checks out, so we'll use it" % \
545                              reader)
546                     self._validated_readers.add(reader)
547                     # Each time we validate a reader, we check to see if
548                     # we need the private key. If we do, we politely ask
549                     # for it and then continue computing. If we find
550                     # that we haven't gotten it at the end of
551                     # segment decoding, then we'll take more drastic
552                     # measures.
553                     if self._need_privkey and not self._node.is_readonly():
554                         d = reader.get_encprivkey()
555                         d.addCallback(self._try_to_validate_privkey, reader)
556             if bad_readers:
557                 # We do them all at once, or else we screw up list indexing.
558                 for (reader, f) in bad_readers:
559                     self._mark_bad_share(reader, f)
560                 if self._verify:
561                     if len(self._active_readers) >= self._required_shares:
562                         return self._download_current_segment()
563                     else:
564                         return self._failed()
565                 else:
566                     return self._add_active_peers()
567             else:
568                 return self._download_current_segment()
569             # The next step will assert that it has enough active
570             # readers to fetch shares; we just need to remove it.
571         dl.addCallback(_check_results)
572         return dl
573
574
575     def _try_to_validate_prefix(self, prefix, reader):
576         """
577         I check that the prefix returned by a candidate server for
578         retrieval matches the prefix that the servermap knows about
579         (and, hence, the prefix that was validated earlier). If it does,
580         I return True, which means that I approve of the use of the
581         candidate server for segment retrieval. If it doesn't, I return
582         False, which means that another server must be chosen.
583         """
584         (seqnum,
585          root_hash,
586          IV,
587          segsize,
588          datalength,
589          k,
590          N,
591          known_prefix,
592          offsets_tuple) = self.verinfo
593         if known_prefix != prefix:
594             self.log("prefix from share %d doesn't match" % reader.shnum)
595             raise UncoordinatedWriteError("Mismatched prefix -- this could "
596                                           "indicate an uncoordinated write")
597         # Otherwise, we're okay -- no issues.
598
599
600     def _remove_reader(self, reader):
601         """
602         At various points, we will wish to remove a peer from
603         consideration and/or use. These include, but are not necessarily
604         limited to:
605
606             - A connection error.
607             - A mismatched prefix (that is, a prefix that does not match
608               our conception of the version information string).
609             - A failing block hash, salt hash, or share hash, which can
610               indicate disk failure/bit flips, or network trouble.
611
612         This method will do that. I will make sure that the
613         (shnum,reader) combination represented by my reader argument is
614         not used for anything else during this download. I will not
615         advise the reader of any corruption, something that my callers
616         may wish to do on their own.
617         """
618         # TODO: When you're done writing this, see if this is ever
619         # actually used for something that _mark_bad_share isn't. I have
620         # a feeling that they will be used for very similar things, and
621         # that having them both here is just going to be an epic amount
622         # of code duplication.
623         #
624         # (well, okay, not epic, but meaningful)
625         self.log("removing reader %s" % reader)
626         # Remove the reader from _active_readers
627         self._active_readers.remove(reader)
628         # TODO: self.readers.remove(reader)?
629         for shnum in list(self.remaining_sharemap.keys()):
630             self.remaining_sharemap.discard(shnum, reader.peerid)
631
632
633     def _mark_bad_share(self, reader, f):
634         """
635         I mark the (peerid, shnum) encapsulated by my reader argument as
636         a bad share, which means that it will not be used anywhere else.
637
638         There are several reasons to want to mark something as a bad
639         share. These include:
640
641             - A connection error to the peer.
642             - A mismatched prefix (that is, a prefix that does not match
643               our local conception of the version information string).
644             - A failing block hash, salt hash, share hash, or other
645               integrity check.
646
647         This method will ensure that readers that we wish to mark bad
648         (for these reasons or other reasons) are not used for the rest
649         of the download. Additionally, it will attempt to tell the
650         remote peer (with no guarantee of success) that its share is
651         corrupt.
652         """
653         self.log("marking share %d on server %s as bad" % \
654                  (reader.shnum, reader))
655         prefix = self.verinfo[-2]
656         self.servermap.mark_bad_share(reader.peerid,
657                                       reader.shnum,
658                                       prefix)
659         self._remove_reader(reader)
660         self._bad_shares.add((reader.peerid, reader.shnum, f))
661         self._status.problems[reader.peerid] = f
662         self._last_failure = f
663         self.notify_server_corruption(reader.peerid, reader.shnum,
664                                       str(f.value))
665
666
667     def _download_current_segment(self):
668         """
669         I download, validate, decode, decrypt, and assemble the segment
670         that this Retrieve is currently responsible for downloading.
671         """
672         assert len(self._active_readers) >= self._required_shares
673         if self._current_segment <= self._last_segment:
674             d = self._process_segment(self._current_segment)
675         else:
676             d = defer.succeed(None)
677         d.addBoth(self._turn_barrier)
678         d.addCallback(self._check_for_done)
679         return d
680
681
682     def _turn_barrier(self, result):
683         """
684         I help the download process avoid the recursion limit issues
685         discussed in #237.
686         """
687         return fireEventually(result)
688
689
690     def _process_segment(self, segnum):
691         """
692         I download, validate, decode, and decrypt one segment of the
693         file that this Retrieve is retrieving. This means coordinating
694         the process of getting k blocks of that file, validating them,
695         assembling them into one segment with the decoder, and then
696         decrypting them.
697         """
698         self.log("processing segment %d" % segnum)
699
700         # TODO: The old code uses a marker. Should this code do that
701         # too? What did the Marker do?
702         assert len(self._active_readers) >= self._required_shares
703
704         # We need to ask each of our active readers for its block and
705         # salt. We will then validate those. If validation is
706         # successful, we will assemble the results into plaintext.
707         ds = []
708         for reader in self._active_readers:
709             started = time.time()
710             d = reader.get_block_and_salt(segnum, queue=True)
711             d2 = self._get_needed_hashes(reader, segnum)
712             dl = defer.DeferredList([d, d2], consumeErrors=True)
713             dl.addCallback(self._validate_block, segnum, reader, started)
714             dl.addErrback(self._validation_or_decoding_failed, [reader])
715             ds.append(dl)
716             reader.flush()
717         dl = defer.DeferredList(ds)
718         if self._verify:
719             dl.addCallback(lambda ignored: "")
720             dl.addCallback(self._set_segment)
721         else:
722             dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
723         return dl
724
725
726     def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum):
727         """
728         I take the results of fetching and validating the blocks from a
729         callback chain in another method. If the results are such that
730         they tell me that validation and fetching succeeded without
731         incident, I will proceed with decoding and decryption.
732         Otherwise, I will do nothing.
733         """
734         self.log("trying to decode and decrypt segment %d" % segnum)
735         failures = False
736         for block_and_salt in blocks_and_salts:
737             if not block_and_salt[0] or block_and_salt[1] == None:
738                 self.log("some validation operations failed; not proceeding")
739                 failures = True
740                 break
741         if not failures:
742             self.log("everything looks ok, building segment %d" % segnum)
743             d = self._decode_blocks(blocks_and_salts, segnum)
744             d.addCallback(self._decrypt_segment)
745             d.addErrback(self._validation_or_decoding_failed,
746                          self._active_readers)
747             # check to see whether we've been paused before writing
748             # anything.
749             d.addCallback(self._check_for_paused)
750             d.addCallback(self._set_segment)
751             return d
752         else:
753             return defer.succeed(None)
754
755
756     def _set_segment(self, segment):
757         """
758         Given a plaintext segment, I register that segment with the
759         target that is handling the file download.
760         """
761         self.log("got plaintext for segment %d" % self._current_segment)
762         if self._current_segment == self._start_segment:
763             # We're on the first segment. It's possible that we want
764             # only some part of the end of this segment, and that we
765             # just downloaded the whole thing to get that part. If so,
766             # we need to account for that and give the reader just the
767             # data that they want.
768             n = self._offset % self._segment_size
769             self.log("stripping %d bytes off of the first segment" % n)
770             self.log("original segment length: %d" % len(segment))
771             segment = segment[n:]
772             self.log("new segment length: %d" % len(segment))
773
774         if self._current_segment == self._last_segment and self._read_length is not None:
775             # We're on the last segment. It's possible that we only want
776             # part of the beginning of this segment, and that we
777             # downloaded the whole thing anyway. Make sure to give the
778             # caller only the portion of the segment that they want to
779             # receive.
780             extra = self._read_length
781             if self._start_segment != self._last_segment:
782                 extra -= self._segment_size - \
783                             (self._offset % self._segment_size)
784             extra %= self._segment_size
785             self.log("original segment length: %d" % len(segment))
786             segment = segment[:extra]
787             self.log("new segment length: %d" % len(segment))
788             self.log("only taking %d bytes of the last segment" % extra)
789
790         if not self._verify:
791             self._consumer.write(segment)
792         else:
793             # we don't care about the plaintext if we are doing a verify.
794             segment = None
795         self._current_segment += 1
796
797
798     def _validation_or_decoding_failed(self, f, readers):
799         """
800         I am called when a block or a salt fails to correctly validate, or when
801         the decryption or decoding operation fails for some reason.  I react to
802         this failure by notifying the remote server of corruption, and then
803         removing the remote peer from further activity.
804         """
805         assert isinstance(readers, list)
806         bad_shnums = [reader.shnum for reader in readers]
807
808         self.log("validation or decoding failed on share(s) %s, peer(s) %s "
809                  ", segment %d: %s" % \
810                  (bad_shnums, readers, self._current_segment, str(f)))
811         for reader in readers:
812             self._mark_bad_share(reader, f)
813         return
814
815
816     def _validate_block(self, results, segnum, reader, started):
817         """
818         I validate a block from one share on a remote server.
819         """
820         # Grab the part of the block hash tree that is necessary to
821         # validate this block, then generate the block hash root.
822         self.log("validating share %d for segment %d" % (reader.shnum,
823                                                              segnum))
824         elapsed = time.time() - started
825         self._status.add_fetch_timing(reader.peerid, elapsed)
826         self._set_current_status("validating blocks")
827         # Did we fail to fetch either of the things that we were
828         # supposed to? Fail if so.
829         if not results[0][0] and results[1][0]:
830             # handled by the errback handler.
831
832             # These all get batched into one query, so the resulting
833             # failure should be the same for all of them, so we can just
834             # use the first one.
835             assert isinstance(results[0][1], failure.Failure)
836
837             f = results[0][1]
838             raise CorruptShareError(reader.peerid,
839                                     reader.shnum,
840                                     "Connection error: %s" % str(f))
841
842         block_and_salt, block_and_sharehashes = results
843         block, salt = block_and_salt[1]
844         blockhashes, sharehashes = block_and_sharehashes[1]
845
846         blockhashes = dict(enumerate(blockhashes[1]))
847         self.log("the reader gave me the following blockhashes: %s" % \
848                  blockhashes.keys())
849         self.log("the reader gave me the following sharehashes: %s" % \
850                  sharehashes[1].keys())
851         bht = self._block_hash_trees[reader.shnum]
852
853         if bht.needed_hashes(segnum, include_leaf=True):
854             try:
855                 bht.set_hashes(blockhashes)
856             except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
857                     IndexError), e:
858                 raise CorruptShareError(reader.peerid,
859                                         reader.shnum,
860                                         "block hash tree failure: %s" % e)
861
862         if self._version == MDMF_VERSION:
863             blockhash = hashutil.block_hash(salt + block)
864         else:
865             blockhash = hashutil.block_hash(block)
866         # If this works without an error, then validation is
867         # successful.
868         try:
869            bht.set_hashes(leaves={segnum: blockhash})
870         except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
871                 IndexError), e:
872             raise CorruptShareError(reader.peerid,
873                                     reader.shnum,
874                                     "block hash tree failure: %s" % e)
875
876         # Reaching this point means that we know that this segment
877         # is correct. Now we need to check to see whether the share
878         # hash chain is also correct. 
879         # SDMF wrote share hash chains that didn't contain the
880         # leaves, which would be produced from the block hash tree.
881         # So we need to validate the block hash tree first. If
882         # successful, then bht[0] will contain the root for the
883         # shnum, which will be a leaf in the share hash tree, which
884         # will allow us to validate the rest of the tree.
885         if self.share_hash_tree.needed_hashes(reader.shnum,
886                                               include_leaf=True) or \
887                                               self._verify:
888             try:
889                 self.share_hash_tree.set_hashes(hashes=sharehashes[1],
890                                             leaves={reader.shnum: bht[0]})
891             except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
892                     IndexError), e:
893                 raise CorruptShareError(reader.peerid,
894                                         reader.shnum,
895                                         "corrupt hashes: %s" % e)
896
897         self.log('share %d is valid for segment %d' % (reader.shnum,
898                                                        segnum))
899         return {reader.shnum: (block, salt)}
900
901
902     def _get_needed_hashes(self, reader, segnum):
903         """
904         I get the hashes needed to validate segnum from the reader, then return
905         to my caller when this is done.
906         """
907         bht = self._block_hash_trees[reader.shnum]
908         needed = bht.needed_hashes(segnum, include_leaf=True)
909         # The root of the block hash tree is also a leaf in the share
910         # hash tree. So we don't need to fetch it from the remote
911         # server. In the case of files with one segment, this means that
912         # we won't fetch any block hash tree from the remote server,
913         # since the hash of each share of the file is the entire block
914         # hash tree, and is a leaf in the share hash tree. This is fine,
915         # since any share corruption will be detected in the share hash
916         # tree.
917         #needed.discard(0)
918         self.log("getting blockhashes for segment %d, share %d: %s" % \
919                  (segnum, reader.shnum, str(needed)))
920         d1 = reader.get_blockhashes(needed, queue=True, force_remote=True)
921         if self.share_hash_tree.needed_hashes(reader.shnum):
922             need = self.share_hash_tree.needed_hashes(reader.shnum)
923             self.log("also need sharehashes for share %d: %s" % (reader.shnum,
924                                                                  str(need)))
925             d2 = reader.get_sharehashes(need, queue=True, force_remote=True)
926         else:
927             d2 = defer.succeed({}) # the logic in the next method
928                                    # expects a dict
929         dl = defer.DeferredList([d1, d2], consumeErrors=True)
930         return dl
931
932
933     def _decode_blocks(self, blocks_and_salts, segnum):
934         """
935         I take a list of k blocks and salts, and decode that into a
936         single encrypted segment.
937         """
938         d = {}
939         # We want to merge our dictionaries to the form 
940         # {shnum: blocks_and_salts}
941         #
942         # The dictionaries come from validate block that way, so we just
943         # need to merge them.
944         for block_and_salt in blocks_and_salts:
945             d.update(block_and_salt[1])
946
947         # All of these blocks should have the same salt; in SDMF, it is
948         # the file-wide IV, while in MDMF it is the per-segment salt. In
949         # either case, we just need to get one of them and use it.
950         #
951         # d.items()[0] is like (shnum, (block, salt))
952         # d.items()[0][1] is like (block, salt)
953         # d.items()[0][1][1] is the salt.
954         salt = d.items()[0][1][1]
955         # Next, extract just the blocks from the dict. We'll use the
956         # salt in the next step.
957         share_and_shareids = [(k, v[0]) for k, v in d.items()]
958         d2 = dict(share_and_shareids)
959         shareids = []
960         shares = []
961         for shareid, share in d2.items():
962             shareids.append(shareid)
963             shares.append(share)
964
965         self._set_current_status("decoding")
966         started = time.time()
967         assert len(shareids) >= self._required_shares, len(shareids)
968         # zfec really doesn't want extra shares
969         shareids = shareids[:self._required_shares]
970         shares = shares[:self._required_shares]
971         self.log("decoding segment %d" % segnum)
972         if segnum == self._num_segments - 1:
973             d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
974         else:
975             d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
976         def _process(buffers):
977             segment = "".join(buffers)
978             self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
979                      segnum=segnum,
980                      numsegs=self._num_segments,
981                      level=log.NOISY)
982             self.log(" joined length %d, datalength %d" %
983                      (len(segment), self._data_length))
984             if segnum == self._num_segments - 1:
985                 size_to_use = self._tail_data_size
986             else:
987                 size_to_use = self._segment_size
988             segment = segment[:size_to_use]
989             self.log(" segment len=%d" % len(segment))
990             self._status.accumulate_decode_time(time.time() - started)
991             return segment, salt
992         d.addCallback(_process)
993         return d
994
995
996     def _decrypt_segment(self, segment_and_salt):
997         """
998         I take a single segment and its salt, and decrypt it. I return
999         the plaintext of the segment that is in my argument.
1000         """
1001         segment, salt = segment_and_salt
1002         self._set_current_status("decrypting")
1003         self.log("decrypting segment %d" % self._current_segment)
1004         started = time.time()
1005         key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
1006         decryptor = AES(key)
1007         plaintext = decryptor.process(segment)
1008         self._status.accumulate_decrypt_time(time.time() - started)
1009         return plaintext
1010
1011
1012     def notify_server_corruption(self, peerid, shnum, reason):
1013         ss = self.servermap.connections[peerid]
1014         ss.callRemoteOnly("advise_corrupt_share",
1015                           "mutable", self._storage_index, shnum, reason)
1016
1017
1018     def _try_to_validate_privkey(self, enc_privkey, reader):
1019         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
1020         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
1021         if alleged_writekey != self._node.get_writekey():
1022             self.log("invalid privkey from %s shnum %d" %
1023                      (reader, reader.shnum),
1024                      level=log.WEIRD, umid="YIw4tA")
1025             if self._verify:
1026                 self.servermap.mark_bad_share(reader.peerid, reader.shnum,
1027                                               self.verinfo[-2])
1028                 e = CorruptShareError(reader.peerid,
1029                                       reader.shnum,
1030                                       "invalid privkey")
1031                 f = failure.Failure(e)
1032                 self._bad_shares.add((reader.peerid, reader.shnum, f))
1033             return
1034
1035         # it's good
1036         self.log("got valid privkey from shnum %d on reader %s" %
1037                  (reader.shnum, reader))
1038         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
1039         self._node._populate_encprivkey(enc_privkey)
1040         self._node._populate_privkey(privkey)
1041         self._need_privkey = False
1042
1043
1044     def _check_for_done(self, res):
1045         """
1046         I check to see if this Retrieve object has successfully finished
1047         its work.
1048
1049         I can exit in the following ways:
1050             - If there are no more segments to download, then I exit by
1051               causing self._done_deferred to fire with the plaintext
1052               content requested by the caller.
1053             - If there are still segments to be downloaded, and there
1054               are enough active readers (readers which have not broken
1055               and have not given us corrupt data) to continue
1056               downloading, I send control back to
1057               _download_current_segment.
1058             - If there are still segments to be downloaded but there are
1059               not enough active peers to download them, I ask
1060               _add_active_peers to add more peers. If it is successful,
1061               it will call _download_current_segment. If there are not
1062               enough peers to retrieve the file, then that will cause
1063               _done_deferred to errback.
1064         """
1065         self.log("checking for doneness")
1066         if self._current_segment > self._last_segment:
1067             # No more segments to download, we're done.
1068             self.log("got plaintext, done")
1069             return self._done()
1070
1071         if len(self._active_readers) >= self._required_shares:
1072             # More segments to download, but we have enough good peers
1073             # in self._active_readers that we can do that without issue,
1074             # so go nab the next segment.
1075             self.log("not done yet: on segment %d of %d" % \
1076                      (self._current_segment + 1, self._num_segments))
1077             return self._download_current_segment()
1078
1079         self.log("not done yet: on segment %d of %d, need to add peers" % \
1080                  (self._current_segment + 1, self._num_segments))
1081         return self._add_active_peers()
1082
1083
1084     def _done(self):
1085         """
1086         I am called by _check_for_done when the download process has
1087         finished successfully. After making some useful logging
1088         statements, I return the decrypted contents to the owner of this
1089         Retrieve object through self._done_deferred.
1090         """
1091         self._running = False
1092         self._status.set_active(False)
1093         now = time.time()
1094         self._status.timings['total'] = now - self._started
1095         self._status.timings['fetch'] = now - self._started_fetching
1096         self._status.set_status("Finished")
1097         self._status.set_progress(1.0)
1098
1099         # remember the encoding parameters, use them again next time
1100         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
1101          offsets_tuple) = self.verinfo
1102         self._node._populate_required_shares(k)
1103         self._node._populate_total_shares(N)
1104
1105         if self._verify:
1106             ret = list(self._bad_shares)
1107             self.log("done verifying, found %d bad shares" % len(ret))
1108         else:
1109             # TODO: upload status here?
1110             ret = self._consumer
1111             self._consumer.unregisterProducer()
1112         eventually(self._done_deferred.callback, ret)
1113
1114
1115     def _failed(self):
1116         """
1117         I am called by _add_active_peers when there are not enough
1118         active peers left to complete the download. After making some
1119         useful logging statements, I return an exception to that effect
1120         to the caller of this Retrieve object through
1121         self._done_deferred.
1122         """
1123         self._running = False
1124         self._status.set_active(False)
1125         now = time.time()
1126         self._status.timings['total'] = now - self._started
1127         self._status.timings['fetch'] = now - self._started_fetching
1128         self._status.set_status("Failed")
1129
1130         if self._verify:
1131             ret = list(self._bad_shares)
1132         else:
1133             format = ("ran out of peers: "
1134                       "have %(have)d of %(total)d segments "
1135                       "found %(bad)d bad shares "
1136                       "encoding %(k)d-of-%(n)d")
1137             args = {"have": self._current_segment,
1138                     "total": self._num_segments,
1139                     "need": self._last_segment,
1140                     "k": self._required_shares,
1141                     "n": self._total_shares,
1142                     "bad": len(self._bad_shares)}
1143             e = NotEnoughSharesError("%s, last failure: %s" % \
1144                                      (format % args, str(self._last_failure)))
1145             f = failure.Failure(e)
1146             ret = f
1147         eventually(self._done_deferred.callback, ret)