]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/retrieve.py
mutable/retrieve: rework the mutable downloader to handle multiple-segment files
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / retrieve.py
1
2 import time
3 from itertools import count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from twisted.internet.interfaces import IPushProducer, IConsumer
8 from foolscap.api import eventually, fireEventually
9 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
10                                  MDMF_VERSION, SDMF_VERSION
11 from allmydata.util import hashutil, log, mathutil
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from pycryptopp.publickey import rsa
17
18 from allmydata.mutable.common import CorruptShareError, UncoordinatedWriteError
19 from allmydata.mutable.layout import MDMFSlotReadProxy
20
21 class RetrieveStatus:
22     implements(IRetrieveStatus)
23     statusid_counter = count(0)
24     def __init__(self):
25         self.timings = {}
26         self.timings["fetch_per_server"] = {}
27         self.timings["cumulative_verify"] = 0.0
28         self.problems = {}
29         self.active = True
30         self.storage_index = None
31         self.helper = False
32         self.encoding = ("?","?")
33         self.size = None
34         self.status = "Not started"
35         self.progress = 0.0
36         self.counter = self.statusid_counter.next()
37         self.started = time.time()
38
39     def get_started(self):
40         return self.started
41     def get_storage_index(self):
42         return self.storage_index
43     def get_encoding(self):
44         return self.encoding
45     def using_helper(self):
46         return self.helper
47     def get_size(self):
48         return self.size
49     def get_status(self):
50         return self.status
51     def get_progress(self):
52         return self.progress
53     def get_active(self):
54         return self.active
55     def get_counter(self):
56         return self.counter
57
58     def add_fetch_timing(self, peerid, elapsed):
59         if peerid not in self.timings["fetch_per_server"]:
60             self.timings["fetch_per_server"][peerid] = []
61         self.timings["fetch_per_server"][peerid].append(elapsed)
62     def set_storage_index(self, si):
63         self.storage_index = si
64     def set_helper(self, helper):
65         self.helper = helper
66     def set_encoding(self, k, n):
67         self.encoding = (k, n)
68     def set_size(self, size):
69         self.size = size
70     def set_status(self, status):
71         self.status = status
72     def set_progress(self, value):
73         self.progress = value
74     def set_active(self, value):
75         self.active = value
76
77 class Marker:
78     pass
79
80 class Retrieve:
81     # this class is currently single-use. Eventually (in MDMF) we will make
82     # it multi-use, in which case you can call download(range) multiple
83     # times, and each will have a separate response chain. However the
84     # Retrieve object will remain tied to a specific version of the file, and
85     # will use a single ServerMap instance.
86     implements(IPushProducer)
87
88     def __init__(self, filenode, servermap, verinfo, fetch_privkey=False,
89                  verify=False):
90         self._node = filenode
91         assert self._node.get_pubkey()
92         self._storage_index = filenode.get_storage_index()
93         assert self._node.get_readkey()
94         self._last_failure = None
95         prefix = si_b2a(self._storage_index)[:5]
96         self._log_number = log.msg("Retrieve(%s): starting" % prefix)
97         self._outstanding_queries = {} # maps (peerid,shnum) to start_time
98         self._running = True
99         self._decoding = False
100         self._bad_shares = set()
101
102         self.servermap = servermap
103         assert self._node.get_pubkey()
104         self.verinfo = verinfo
105         # during repair, we may be called upon to grab the private key, since
106         # it wasn't picked up during a verify=False checker run, and we'll
107         # need it for repair to generate a new version.
108         self._need_privkey = fetch_privkey or verify
109         if self._node.get_privkey() and not verify:
110             self._need_privkey = False
111
112         if self._need_privkey:
113             # TODO: Evaluate the need for this. We'll use it if we want
114             # to limit how many queries are on the wire for the privkey
115             # at once.
116             self._privkey_query_markers = [] # one Marker for each time we've
117                                              # tried to get the privkey.
118
119         # verify means that we are using the downloader logic to verify all
120         # of our shares. This tells the downloader a few things.
121         # 
122         # 1. We need to download all of the shares.
123         # 2. We don't need to decode or decrypt the shares, since our
124         #    caller doesn't care about the plaintext, only the
125         #    information about which shares are or are not valid.
126         # 3. When we are validating readers, we need to validate the
127         #    signature on the prefix. Do we? We already do this in the
128         #    servermap update?
129         self._verify = False
130         if verify:
131             self._verify = True
132
133         self._status = RetrieveStatus()
134         self._status.set_storage_index(self._storage_index)
135         self._status.set_helper(False)
136         self._status.set_progress(0.0)
137         self._status.set_active(True)
138         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
139          offsets_tuple) = self.verinfo
140         self._status.set_size(datalength)
141         self._status.set_encoding(k, N)
142         self.readers = {}
143         self._paused = False
144         self._paused_deferred = None
145         self._offset = None
146         self._read_length = None
147         self.log("got seqnum %d" % self.verinfo[0])
148
149
150     def get_status(self):
151         return self._status
152
153     def log(self, *args, **kwargs):
154         if "parent" not in kwargs:
155             kwargs["parent"] = self._log_number
156         if "facility" not in kwargs:
157             kwargs["facility"] = "tahoe.mutable.retrieve"
158         return log.msg(*args, **kwargs)
159
160
161     ###################
162     # IPushProducer
163
164     def pauseProducing(self):
165         """
166         I am called by my download target if we have produced too much
167         data for it to handle. I make the downloader stop producing new
168         data until my resumeProducing method is called.
169         """
170         if self._paused:
171             return
172
173         # fired when the download is unpaused. 
174         self._old_status = self._status.get_status()
175         self._status.set_status("Paused")
176
177         self._pause_deferred = defer.Deferred()
178         self._paused = True
179
180
181     def resumeProducing(self):
182         """
183         I am called by my download target once it is ready to begin
184         receiving data again.
185         """
186         if not self._paused:
187             return
188
189         self._paused = False
190         p = self._pause_deferred
191         self._pause_deferred = None
192         self._status.set_status(self._old_status)
193
194         eventually(p.callback, None)
195
196
197     def _check_for_paused(self, res):
198         """
199         I am called just before a write to the consumer. I return a
200         Deferred that eventually fires with the data that is to be
201         written to the consumer. If the download has not been paused,
202         the Deferred fires immediately. Otherwise, the Deferred fires
203         when the downloader is unpaused.
204         """
205         if self._paused:
206             d = defer.Deferred()
207             self._pause_deferred.addCallback(lambda ignored: d.callback(res))
208             return d
209         return defer.succeed(res)
210
211
212     def download(self, consumer=None, offset=0, size=None):
213         assert IConsumer.providedBy(consumer) or self._verify
214
215         if consumer:
216             self._consumer = consumer
217             # we provide IPushProducer, so streaming=True, per
218             # IConsumer.
219             self._consumer.registerProducer(self, streaming=True)
220
221         self._done_deferred = defer.Deferred()
222         self._started = time.time()
223         self._status.set_status("Retrieving Shares")
224
225         self._offset = offset
226         self._read_length = size
227
228         # first, which servers can we use?
229         versionmap = self.servermap.make_versionmap()
230         shares = versionmap[self.verinfo]
231         # this sharemap is consumed as we decide to send requests
232         self.remaining_sharemap = DictOfSets()
233         for (shnum, peerid, timestamp) in shares:
234             self.remaining_sharemap.add(shnum, peerid)
235             # If the servermap update fetched anything, it fetched at least 1
236             # KiB, so we ask for that much.
237             # TODO: Change the cache methods to allow us to fetch all of the
238             # data that they have, then change this method to do that.
239             any_cache = self._node._read_from_cache(self.verinfo, shnum,
240                                                     0, 1000)
241             ss = self.servermap.connections[peerid]
242             reader = MDMFSlotReadProxy(ss,
243                                        self._storage_index,
244                                        shnum,
245                                        any_cache)
246             reader.peerid = peerid
247             self.readers[shnum] = reader
248
249
250         self.shares = {} # maps shnum to validated blocks
251         self._active_readers = [] # list of active readers for this dl.
252         self._validated_readers = set() # set of readers that we have
253                                         # validated the prefix of
254         self._block_hash_trees = {} # shnum => hashtree
255
256         # how many shares do we need?
257         (seqnum,
258          root_hash,
259          IV,
260          segsize,
261          datalength,
262          k,
263          N,
264          prefix,
265          offsets_tuple) = self.verinfo
266
267
268         # We need one share hash tree for the entire file; its leaves
269         # are the roots of the block hash trees for the shares that
270         # comprise it, and its root is in the verinfo.
271         self.share_hash_tree = hashtree.IncompleteHashTree(N)
272         self.share_hash_tree.set_hashes({0: root_hash})
273
274         # This will set up both the segment decoder and the tail segment
275         # decoder, as well as a variety of other instance variables that
276         # the download process will use.
277         self._setup_encoding_parameters()
278         assert len(self.remaining_sharemap) >= k
279
280         self.log("starting download")
281         self._paused = False
282         self._started_fetching = time.time()
283
284         self._add_active_peers()
285         # The download process beyond this is a state machine.
286         # _add_active_peers will select the peers that we want to use
287         # for the download, and then attempt to start downloading. After
288         # each segment, it will check for doneness, reacting to broken
289         # peers and corrupt shares as necessary. If it runs out of good
290         # peers before downloading all of the segments, _done_deferred
291         # will errback.  Otherwise, it will eventually callback with the
292         # contents of the mutable file.
293         return self._done_deferred
294
295
296     def decode(self, blocks_and_salts, segnum):
297         """
298         I am a helper method that the mutable file update process uses
299         as a shortcut to decode and decrypt the segments that it needs
300         to fetch in order to perform a file update. I take in a
301         collection of blocks and salts, and pick some of those to make a
302         segment with. I return the plaintext associated with that
303         segment.
304         """
305         # shnum => block hash tree. Unusued, but setup_encoding_parameters will
306         # want to set this.
307         # XXX: Make it so that it won't set this if we're just decoding.
308         self._block_hash_trees = {}
309         self._setup_encoding_parameters()
310         # This is the form expected by decode.
311         blocks_and_salts = blocks_and_salts.items()
312         blocks_and_salts = [(True, [d]) for d in blocks_and_salts]
313
314         d = self._decode_blocks(blocks_and_salts, segnum)
315         d.addCallback(self._decrypt_segment)
316         return d
317
318
319     def _setup_encoding_parameters(self):
320         """
321         I set up the encoding parameters, including k, n, the number
322         of segments associated with this file, and the segment decoder.
323         """
324         (seqnum,
325          root_hash,
326          IV,
327          segsize,
328          datalength,
329          k,
330          n,
331          known_prefix,
332          offsets_tuple) = self.verinfo
333         self._required_shares = k
334         self._total_shares = n
335         self._segment_size = segsize
336         self._data_length = datalength
337
338         if not IV:
339             self._version = MDMF_VERSION
340         else:
341             self._version = SDMF_VERSION
342
343         if datalength and segsize:
344             self._num_segments = mathutil.div_ceil(datalength, segsize)
345             self._tail_data_size = datalength % segsize
346         else:
347             self._num_segments = 0
348             self._tail_data_size = 0
349
350         self._segment_decoder = codec.CRSDecoder()
351         self._segment_decoder.set_params(segsize, k, n)
352
353         if  not self._tail_data_size:
354             self._tail_data_size = segsize
355
356         self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
357                                                          self._required_shares)
358         if self._tail_segment_size == self._segment_size:
359             self._tail_decoder = self._segment_decoder
360         else:
361             self._tail_decoder = codec.CRSDecoder()
362             self._tail_decoder.set_params(self._tail_segment_size,
363                                           self._required_shares,
364                                           self._total_shares)
365
366         self.log("got encoding parameters: "
367                  "k: %d "
368                  "n: %d "
369                  "%d segments of %d bytes each (%d byte tail segment)" % \
370                  (k, n, self._num_segments, self._segment_size,
371                   self._tail_segment_size))
372
373         for i in xrange(self._total_shares):
374             # So we don't have to do this later.
375             self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
376
377         # Our last task is to tell the downloader where to start and
378         # where to stop. We use three parameters for that:
379         #   - self._start_segment: the segment that we need to start
380         #     downloading from. 
381         #   - self._current_segment: the next segment that we need to
382         #     download.
383         #   - self._last_segment: The last segment that we were asked to
384         #     download.
385         #
386         #  We say that the download is complete when
387         #  self._current_segment > self._last_segment. We use
388         #  self._start_segment and self._last_segment to know when to
389         #  strip things off of segments, and how much to strip.
390         if self._offset:
391             self.log("got offset: %d" % self._offset)
392             # our start segment is the first segment containing the
393             # offset we were given. 
394             start = mathutil.div_ceil(self._offset,
395                                       self._segment_size)
396             # this gets us the first segment after self._offset. Then
397             # our start segment is the one before it.
398             start -= 1
399
400             assert start < self._num_segments
401             self._start_segment = start
402             self.log("got start segment: %d" % self._start_segment)
403         else:
404             self._start_segment = 0
405
406
407         if self._read_length:
408             # our end segment is the last segment containing part of the
409             # segment that we were asked to read.
410             self.log("got read length %d" % self._read_length)
411             end_data = self._offset + self._read_length
412             end = mathutil.div_ceil(end_data,
413                                     self._segment_size)
414             end -= 1
415             assert end < self._num_segments
416             self._last_segment = end
417             self.log("got end segment: %d" % self._last_segment)
418         else:
419             self._last_segment = self._num_segments - 1
420
421         self._current_segment = self._start_segment
422
423     def _add_active_peers(self):
424         """
425         I populate self._active_readers with enough active readers to
426         retrieve the contents of this mutable file. I am called before
427         downloading starts, and (eventually) after each validation
428         error, connection error, or other problem in the download.
429         """
430         # TODO: It would be cool to investigate other heuristics for
431         # reader selection. For instance, the cost (in time the user
432         # spends waiting for their file) of selecting a really slow peer
433         # that happens to have a primary share is probably more than
434         # selecting a really fast peer that doesn't have a primary
435         # share. Maybe the servermap could be extended to provide this
436         # information; it could keep track of latency information while
437         # it gathers more important data, and then this routine could
438         # use that to select active readers.
439         #
440         # (these and other questions would be easier to answer with a
441         #  robust, configurable tahoe-lafs simulator, which modeled node
442         #  failures, differences in node speed, and other characteristics
443         #  that we expect storage servers to have.  You could have
444         #  presets for really stable grids (like allmydata.com),
445         #  friendnets, make it easy to configure your own settings, and
446         #  then simulate the effect of big changes on these use cases
447         #  instead of just reasoning about what the effect might be. Out
448         #  of scope for MDMF, though.)
449
450         # We need at least self._required_shares readers to download a
451         # segment.
452         if self._verify:
453             needed = self._total_shares
454         else:
455             needed = self._required_shares - len(self._active_readers)
456         # XXX: Why don't format= log messages work here?
457         self.log("adding %d peers to the active peers list" % needed)
458
459         # We favor lower numbered shares, since FEC is faster with
460         # primary shares than with other shares, and lower-numbered
461         # shares are more likely to be primary than higher numbered
462         # shares.
463         active_shnums = set(sorted(self.remaining_sharemap.keys()))
464         # We shouldn't consider adding shares that we already have; this
465         # will cause problems later.
466         active_shnums -= set([reader.shnum for reader in self._active_readers])
467         active_shnums = list(active_shnums)[:needed]
468         if len(active_shnums) < needed and not self._verify:
469             # We don't have enough readers to retrieve the file; fail.
470             return self._failed()
471
472         for shnum in active_shnums:
473             self._active_readers.append(self.readers[shnum])
474             self.log("added reader for share %d" % shnum)
475         assert len(self._active_readers) >= self._required_shares
476         # Conceptually, this is part of the _add_active_peers step. It
477         # validates the prefixes of newly added readers to make sure
478         # that they match what we are expecting for self.verinfo. If
479         # validation is successful, _validate_active_prefixes will call
480         # _download_current_segment for us. If validation is
481         # unsuccessful, then _validate_prefixes will remove the peer and
482         # call _add_active_peers again, where we will attempt to rectify
483         # the problem by choosing another peer.
484         return self._validate_active_prefixes()
485
486
487     def _validate_active_prefixes(self):
488         """
489         I check to make sure that the prefixes on the peers that I am
490         currently reading from match the prefix that we want to see, as
491         said in self.verinfo.
492
493         If I find that all of the active peers have acceptable prefixes,
494         I pass control to _download_current_segment, which will use
495         those peers to do cool things. If I find that some of the active
496         peers have unacceptable prefixes, I will remove them from active
497         peers (and from further consideration) and call
498         _add_active_peers to attempt to rectify the situation. I keep
499         track of which peers I have already validated so that I don't
500         need to do so again.
501         """
502         assert self._active_readers, "No more active readers"
503
504         ds = []
505         new_readers = set(self._active_readers) - self._validated_readers
506         self.log('validating %d newly-added active readers' % len(new_readers))
507
508         for reader in new_readers:
509             # We force a remote read here -- otherwise, we are relying
510             # on cached data that we already verified as valid, and we
511             # won't detect an uncoordinated write that has occurred
512             # since the last servermap update.
513             d = reader.get_prefix(force_remote=True)
514             d.addCallback(self._try_to_validate_prefix, reader)
515             ds.append(d)
516         dl = defer.DeferredList(ds, consumeErrors=True)
517         def _check_results(results):
518             # Each result in results will be of the form (success, msg).
519             # We don't care about msg, but success will tell us whether
520             # or not the checkstring validated. If it didn't, we need to
521             # remove the offending (peer,share) from our active readers,
522             # and ensure that active readers is again populated.
523             bad_readers = []
524             for i, result in enumerate(results):
525                 if not result[0]:
526                     reader = self._active_readers[i]
527                     f = result[1]
528                     assert isinstance(f, failure.Failure)
529
530                     self.log("The reader %s failed to "
531                              "properly validate: %s" % \
532                              (reader, str(f.value)))
533                     bad_readers.append((reader, f))
534                 else:
535                     reader = self._active_readers[i]
536                     self.log("the reader %s checks out, so we'll use it" % \
537                              reader)
538                     self._validated_readers.add(reader)
539                     # Each time we validate a reader, we check to see if
540                     # we need the private key. If we do, we politely ask
541                     # for it and then continue computing. If we find
542                     # that we haven't gotten it at the end of
543                     # segment decoding, then we'll take more drastic
544                     # measures.
545                     if self._need_privkey and not self._node.is_readonly():
546                         d = reader.get_encprivkey()
547                         d.addCallback(self._try_to_validate_privkey, reader)
548             if bad_readers:
549                 # We do them all at once, or else we screw up list indexing.
550                 for (reader, f) in bad_readers:
551                     self._mark_bad_share(reader, f)
552                 if self._verify:
553                     if len(self._active_readers) >= self._required_shares:
554                         return self._download_current_segment()
555                     else:
556                         return self._failed()
557                 else:
558                     return self._add_active_peers()
559             else:
560                 return self._download_current_segment()
561             # The next step will assert that it has enough active
562             # readers to fetch shares; we just need to remove it.
563         dl.addCallback(_check_results)
564         return dl
565
566
567     def _try_to_validate_prefix(self, prefix, reader):
568         """
569         I check that the prefix returned by a candidate server for
570         retrieval matches the prefix that the servermap knows about
571         (and, hence, the prefix that was validated earlier). If it does,
572         I return True, which means that I approve of the use of the
573         candidate server for segment retrieval. If it doesn't, I return
574         False, which means that another server must be chosen.
575         """
576         (seqnum,
577          root_hash,
578          IV,
579          segsize,
580          datalength,
581          k,
582          N,
583          known_prefix,
584          offsets_tuple) = self.verinfo
585         if known_prefix != prefix:
586             self.log("prefix from share %d doesn't match" % reader.shnum)
587             raise UncoordinatedWriteError("Mismatched prefix -- this could "
588                                           "indicate an uncoordinated write")
589         # Otherwise, we're okay -- no issues.
590
591
592     def _remove_reader(self, reader):
593         """
594         At various points, we will wish to remove a peer from
595         consideration and/or use. These include, but are not necessarily
596         limited to:
597
598             - A connection error.
599             - A mismatched prefix (that is, a prefix that does not match
600               our conception of the version information string).
601             - A failing block hash, salt hash, or share hash, which can
602               indicate disk failure/bit flips, or network trouble.
603
604         This method will do that. I will make sure that the
605         (shnum,reader) combination represented by my reader argument is
606         not used for anything else during this download. I will not
607         advise the reader of any corruption, something that my callers
608         may wish to do on their own.
609         """
610         # TODO: When you're done writing this, see if this is ever
611         # actually used for something that _mark_bad_share isn't. I have
612         # a feeling that they will be used for very similar things, and
613         # that having them both here is just going to be an epic amount
614         # of code duplication.
615         #
616         # (well, okay, not epic, but meaningful)
617         self.log("removing reader %s" % reader)
618         # Remove the reader from _active_readers
619         self._active_readers.remove(reader)
620         # TODO: self.readers.remove(reader)?
621         for shnum in list(self.remaining_sharemap.keys()):
622             self.remaining_sharemap.discard(shnum, reader.peerid)
623
624
625     def _mark_bad_share(self, reader, f):
626         """
627         I mark the (peerid, shnum) encapsulated by my reader argument as
628         a bad share, which means that it will not be used anywhere else.
629
630         There are several reasons to want to mark something as a bad
631         share. These include:
632
633             - A connection error to the peer.
634             - A mismatched prefix (that is, a prefix that does not match
635               our local conception of the version information string).
636             - A failing block hash, salt hash, share hash, or other
637               integrity check.
638
639         This method will ensure that readers that we wish to mark bad
640         (for these reasons or other reasons) are not used for the rest
641         of the download. Additionally, it will attempt to tell the
642         remote peer (with no guarantee of success) that its share is
643         corrupt.
644         """
645         self.log("marking share %d on server %s as bad" % \
646                  (reader.shnum, reader))
647         prefix = self.verinfo[-2]
648         self.servermap.mark_bad_share(reader.peerid,
649                                       reader.shnum,
650                                       prefix)
651         self._remove_reader(reader)
652         self._bad_shares.add((reader.peerid, reader.shnum, f))
653         self._status.problems[reader.peerid] = f
654         self._last_failure = f
655         self.notify_server_corruption(reader.peerid, reader.shnum,
656                                       str(f.value))
657
658
659     def _download_current_segment(self):
660         """
661         I download, validate, decode, decrypt, and assemble the segment
662         that this Retrieve is currently responsible for downloading.
663         """
664         assert len(self._active_readers) >= self._required_shares
665         if self._current_segment <= self._last_segment:
666             d = self._process_segment(self._current_segment)
667         else:
668             d = defer.succeed(None)
669         d.addBoth(self._turn_barrier)
670         d.addCallback(self._check_for_done)
671         return d
672
673
674     def _turn_barrier(self, result):
675         """
676         I help the download process avoid the recursion limit issues
677         discussed in #237.
678         """
679         return fireEventually(result)
680
681
682     def _process_segment(self, segnum):
683         """
684         I download, validate, decode, and decrypt one segment of the
685         file that this Retrieve is retrieving. This means coordinating
686         the process of getting k blocks of that file, validating them,
687         assembling them into one segment with the decoder, and then
688         decrypting them.
689         """
690         self.log("processing segment %d" % segnum)
691
692         # TODO: The old code uses a marker. Should this code do that
693         # too? What did the Marker do?
694         assert len(self._active_readers) >= self._required_shares
695
696         # We need to ask each of our active readers for its block and
697         # salt. We will then validate those. If validation is
698         # successful, we will assemble the results into plaintext.
699         ds = []
700         for reader in self._active_readers:
701             started = time.time()
702             d = reader.get_block_and_salt(segnum, queue=True)
703             d2 = self._get_needed_hashes(reader, segnum)
704             dl = defer.DeferredList([d, d2], consumeErrors=True)
705             dl.addCallback(self._validate_block, segnum, reader, started)
706             dl.addErrback(self._validation_or_decoding_failed, [reader])
707             ds.append(dl)
708             reader.flush()
709         dl = defer.DeferredList(ds)
710         if self._verify:
711             dl.addCallback(lambda ignored: "")
712             dl.addCallback(self._set_segment)
713         else:
714             dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
715         return dl
716
717
718     def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum):
719         """
720         I take the results of fetching and validating the blocks from a
721         callback chain in another method. If the results are such that
722         they tell me that validation and fetching succeeded without
723         incident, I will proceed with decoding and decryption.
724         Otherwise, I will do nothing.
725         """
726         self.log("trying to decode and decrypt segment %d" % segnum)
727         failures = False
728         for block_and_salt in blocks_and_salts:
729             if not block_and_salt[0] or block_and_salt[1] == None:
730                 self.log("some validation operations failed; not proceeding")
731                 failures = True
732                 break
733         if not failures:
734             self.log("everything looks ok, building segment %d" % segnum)
735             d = self._decode_blocks(blocks_and_salts, segnum)
736             d.addCallback(self._decrypt_segment)
737             d.addErrback(self._validation_or_decoding_failed,
738                          self._active_readers)
739             # check to see whether we've been paused before writing
740             # anything.
741             d.addCallback(self._check_for_paused)
742             d.addCallback(self._set_segment)
743             return d
744         else:
745             return defer.succeed(None)
746
747
748     def _set_segment(self, segment):
749         """
750         Given a plaintext segment, I register that segment with the
751         target that is handling the file download.
752         """
753         self.log("got plaintext for segment %d" % self._current_segment)
754         if self._current_segment == self._start_segment:
755             # We're on the first segment. It's possible that we want
756             # only some part of the end of this segment, and that we
757             # just downloaded the whole thing to get that part. If so,
758             # we need to account for that and give the reader just the
759             # data that they want.
760             n = self._offset % self._segment_size
761             self.log("stripping %d bytes off of the first segment" % n)
762             self.log("original segment length: %d" % len(segment))
763             segment = segment[n:]
764             self.log("new segment length: %d" % len(segment))
765
766         if self._current_segment == self._last_segment and self._read_length is not None:
767             # We're on the last segment. It's possible that we only want
768             # part of the beginning of this segment, and that we
769             # downloaded the whole thing anyway. Make sure to give the
770             # caller only the portion of the segment that they want to
771             # receive.
772             extra = self._read_length
773             if self._start_segment != self._last_segment:
774                 extra -= self._segment_size - \
775                             (self._offset % self._segment_size)
776             extra %= self._segment_size
777             self.log("original segment length: %d" % len(segment))
778             segment = segment[:extra]
779             self.log("new segment length: %d" % len(segment))
780             self.log("only taking %d bytes of the last segment" % extra)
781
782         if not self._verify:
783             self._consumer.write(segment)
784         else:
785             # we don't care about the plaintext if we are doing a verify.
786             segment = None
787         self._current_segment += 1
788
789
790     def _validation_or_decoding_failed(self, f, readers):
791         """
792         I am called when a block or a salt fails to correctly validate, or when
793         the decryption or decoding operation fails for some reason.  I react to
794         this failure by notifying the remote server of corruption, and then
795         removing the remote peer from further activity.
796         """
797         assert isinstance(readers, list)
798         bad_shnums = [reader.shnum for reader in readers]
799
800         self.log("validation or decoding failed on share(s) %s, peer(s) %s "
801                  ", segment %d: %s" % \
802                  (bad_shnums, readers, self._current_segment, str(f)))
803         for reader in readers:
804             self._mark_bad_share(reader, f)
805         return
806
807
808     def _validate_block(self, results, segnum, reader, started):
809         """
810         I validate a block from one share on a remote server.
811         """
812         # Grab the part of the block hash tree that is necessary to
813         # validate this block, then generate the block hash root.
814         self.log("validating share %d for segment %d" % (reader.shnum,
815                                                              segnum))
816         self._status.add_fetch_timing(reader.peerid, started)
817         self._status.set_status("Valdiating blocks for segment %d" % segnum)
818         # Did we fail to fetch either of the things that we were
819         # supposed to? Fail if so.
820         if not results[0][0] and results[1][0]:
821             # handled by the errback handler.
822
823             # These all get batched into one query, so the resulting
824             # failure should be the same for all of them, so we can just
825             # use the first one.
826             assert isinstance(results[0][1], failure.Failure)
827
828             f = results[0][1]
829             raise CorruptShareError(reader.peerid,
830                                     reader.shnum,
831                                     "Connection error: %s" % str(f))
832
833         block_and_salt, block_and_sharehashes = results
834         block, salt = block_and_salt[1]
835         blockhashes, sharehashes = block_and_sharehashes[1]
836
837         blockhashes = dict(enumerate(blockhashes[1]))
838         self.log("the reader gave me the following blockhashes: %s" % \
839                  blockhashes.keys())
840         self.log("the reader gave me the following sharehashes: %s" % \
841                  sharehashes[1].keys())
842         bht = self._block_hash_trees[reader.shnum]
843
844         if bht.needed_hashes(segnum, include_leaf=True):
845             try:
846                 bht.set_hashes(blockhashes)
847             except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
848                     IndexError), e:
849                 raise CorruptShareError(reader.peerid,
850                                         reader.shnum,
851                                         "block hash tree failure: %s" % e)
852
853         if self._version == MDMF_VERSION:
854             blockhash = hashutil.block_hash(salt + block)
855         else:
856             blockhash = hashutil.block_hash(block)
857         # If this works without an error, then validation is
858         # successful.
859         try:
860            bht.set_hashes(leaves={segnum: blockhash})
861         except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
862                 IndexError), e:
863             raise CorruptShareError(reader.peerid,
864                                     reader.shnum,
865                                     "block hash tree failure: %s" % e)
866
867         # Reaching this point means that we know that this segment
868         # is correct. Now we need to check to see whether the share
869         # hash chain is also correct. 
870         # SDMF wrote share hash chains that didn't contain the
871         # leaves, which would be produced from the block hash tree.
872         # So we need to validate the block hash tree first. If
873         # successful, then bht[0] will contain the root for the
874         # shnum, which will be a leaf in the share hash tree, which
875         # will allow us to validate the rest of the tree.
876         if self.share_hash_tree.needed_hashes(reader.shnum,
877                                               include_leaf=True) or \
878                                               self._verify:
879             try:
880                 self.share_hash_tree.set_hashes(hashes=sharehashes[1],
881                                             leaves={reader.shnum: bht[0]})
882             except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
883                     IndexError), e:
884                 raise CorruptShareError(reader.peerid,
885                                         reader.shnum,
886                                         "corrupt hashes: %s" % e)
887
888         self.log('share %d is valid for segment %d' % (reader.shnum,
889                                                        segnum))
890         return {reader.shnum: (block, salt)}
891
892
893     def _get_needed_hashes(self, reader, segnum):
894         """
895         I get the hashes needed to validate segnum from the reader, then return
896         to my caller when this is done.
897         """
898         bht = self._block_hash_trees[reader.shnum]
899         needed = bht.needed_hashes(segnum, include_leaf=True)
900         # The root of the block hash tree is also a leaf in the share
901         # hash tree. So we don't need to fetch it from the remote
902         # server. In the case of files with one segment, this means that
903         # we won't fetch any block hash tree from the remote server,
904         # since the hash of each share of the file is the entire block
905         # hash tree, and is a leaf in the share hash tree. This is fine,
906         # since any share corruption will be detected in the share hash
907         # tree.
908         #needed.discard(0)
909         self.log("getting blockhashes for segment %d, share %d: %s" % \
910                  (segnum, reader.shnum, str(needed)))
911         d1 = reader.get_blockhashes(needed, queue=True, force_remote=True)
912         if self.share_hash_tree.needed_hashes(reader.shnum):
913             need = self.share_hash_tree.needed_hashes(reader.shnum)
914             self.log("also need sharehashes for share %d: %s" % (reader.shnum,
915                                                                  str(need)))
916             d2 = reader.get_sharehashes(need, queue=True, force_remote=True)
917         else:
918             d2 = defer.succeed({}) # the logic in the next method
919                                    # expects a dict
920         dl = defer.DeferredList([d1, d2], consumeErrors=True)
921         return dl
922
923
924     def _decode_blocks(self, blocks_and_salts, segnum):
925         """
926         I take a list of k blocks and salts, and decode that into a
927         single encrypted segment.
928         """
929         d = {}
930         # We want to merge our dictionaries to the form 
931         # {shnum: blocks_and_salts}
932         #
933         # The dictionaries come from validate block that way, so we just
934         # need to merge them.
935         for block_and_salt in blocks_and_salts:
936             d.update(block_and_salt[1])
937
938         # All of these blocks should have the same salt; in SDMF, it is
939         # the file-wide IV, while in MDMF it is the per-segment salt. In
940         # either case, we just need to get one of them and use it.
941         #
942         # d.items()[0] is like (shnum, (block, salt))
943         # d.items()[0][1] is like (block, salt)
944         # d.items()[0][1][1] is the salt.
945         salt = d.items()[0][1][1]
946         # Next, extract just the blocks from the dict. We'll use the
947         # salt in the next step.
948         share_and_shareids = [(k, v[0]) for k, v in d.items()]
949         d2 = dict(share_and_shareids)
950         shareids = []
951         shares = []
952         for shareid, share in d2.items():
953             shareids.append(shareid)
954             shares.append(share)
955
956         self._status.set_status("Decoding")
957         started = time.time()
958         assert len(shareids) >= self._required_shares, len(shareids)
959         # zfec really doesn't want extra shares
960         shareids = shareids[:self._required_shares]
961         shares = shares[:self._required_shares]
962         self.log("decoding segment %d" % segnum)
963         if segnum == self._num_segments - 1:
964             d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
965         else:
966             d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
967         def _process(buffers):
968             segment = "".join(buffers)
969             self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
970                      segnum=segnum,
971                      numsegs=self._num_segments,
972                      level=log.NOISY)
973             self.log(" joined length %d, datalength %d" %
974                      (len(segment), self._data_length))
975             if segnum == self._num_segments - 1:
976                 size_to_use = self._tail_data_size
977             else:
978                 size_to_use = self._segment_size
979             segment = segment[:size_to_use]
980             self.log(" segment len=%d" % len(segment))
981             self._status.timings.setdefault("decode", 0)
982             self._status.timings['decode'] = time.time() - started
983             return segment, salt
984         d.addCallback(_process)
985         return d
986
987
988     def _decrypt_segment(self, segment_and_salt):
989         """
990         I take a single segment and its salt, and decrypt it. I return
991         the plaintext of the segment that is in my argument.
992         """
993         segment, salt = segment_and_salt
994         self._status.set_status("decrypting")
995         self.log("decrypting segment %d" % self._current_segment)
996         started = time.time()
997         key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
998         decryptor = AES(key)
999         plaintext = decryptor.process(segment)
1000         self._status.timings.setdefault("decrypt", 0)
1001         self._status.timings['decrypt'] = time.time() - started
1002         return plaintext
1003
1004
1005     def notify_server_corruption(self, peerid, shnum, reason):
1006         ss = self.servermap.connections[peerid]
1007         ss.callRemoteOnly("advise_corrupt_share",
1008                           "mutable", self._storage_index, shnum, reason)
1009
1010
1011     def _try_to_validate_privkey(self, enc_privkey, reader):
1012         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
1013         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
1014         if alleged_writekey != self._node.get_writekey():
1015             self.log("invalid privkey from %s shnum %d" %
1016                      (reader, reader.shnum),
1017                      level=log.WEIRD, umid="YIw4tA")
1018             if self._verify:
1019                 self.servermap.mark_bad_share(reader.peerid, reader.shnum,
1020                                               self.verinfo[-2])
1021                 e = CorruptShareError(reader.peerid,
1022                                       reader.shnum,
1023                                       "invalid privkey")
1024                 f = failure.Failure(e)
1025                 self._bad_shares.add((reader.peerid, reader.shnum, f))
1026             return
1027
1028         # it's good
1029         self.log("got valid privkey from shnum %d on reader %s" %
1030                  (reader.shnum, reader))
1031         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
1032         self._node._populate_encprivkey(enc_privkey)
1033         self._node._populate_privkey(privkey)
1034         self._need_privkey = False
1035
1036
1037     def _check_for_done(self, res):
1038         """
1039         I check to see if this Retrieve object has successfully finished
1040         its work.
1041
1042         I can exit in the following ways:
1043             - If there are no more segments to download, then I exit by
1044               causing self._done_deferred to fire with the plaintext
1045               content requested by the caller.
1046             - If there are still segments to be downloaded, and there
1047               are enough active readers (readers which have not broken
1048               and have not given us corrupt data) to continue
1049               downloading, I send control back to
1050               _download_current_segment.
1051             - If there are still segments to be downloaded but there are
1052               not enough active peers to download them, I ask
1053               _add_active_peers to add more peers. If it is successful,
1054               it will call _download_current_segment. If there are not
1055               enough peers to retrieve the file, then that will cause
1056               _done_deferred to errback.
1057         """
1058         self.log("checking for doneness")
1059         if self._current_segment > self._last_segment:
1060             # No more segments to download, we're done.
1061             self.log("got plaintext, done")
1062             return self._done()
1063
1064         if len(self._active_readers) >= self._required_shares:
1065             # More segments to download, but we have enough good peers
1066             # in self._active_readers that we can do that without issue,
1067             # so go nab the next segment.
1068             self.log("not done yet: on segment %d of %d" % \
1069                      (self._current_segment + 1, self._num_segments))
1070             return self._download_current_segment()
1071
1072         self.log("not done yet: on segment %d of %d, need to add peers" % \
1073                  (self._current_segment + 1, self._num_segments))
1074         return self._add_active_peers()
1075
1076
1077     def _done(self):
1078         """
1079         I am called by _check_for_done when the download process has
1080         finished successfully. After making some useful logging
1081         statements, I return the decrypted contents to the owner of this
1082         Retrieve object through self._done_deferred.
1083         """
1084         self._running = False
1085         self._status.set_active(False)
1086         now = time.time()
1087         self._status.timings['total'] = now - self._started
1088         self._status.timings['fetch'] = now - self._started_fetching
1089
1090         if self._verify:
1091             ret = list(self._bad_shares)
1092             self.log("done verifying, found %d bad shares" % len(ret))
1093         else:
1094             # TODO: upload status here?
1095             ret = self._consumer
1096             self._consumer.unregisterProducer()
1097         eventually(self._done_deferred.callback, ret)
1098
1099
1100     def _failed(self):
1101         """
1102         I am called by _add_active_peers when there are not enough
1103         active peers left to complete the download. After making some
1104         useful logging statements, I return an exception to that effect
1105         to the caller of this Retrieve object through
1106         self._done_deferred.
1107         """
1108         self._running = False
1109         self._status.set_active(False)
1110         now = time.time()
1111         self._status.timings['total'] = now - self._started
1112         self._status.timings['fetch'] = now - self._started_fetching
1113
1114         if self._verify:
1115             ret = list(self._bad_shares)
1116         else:
1117             format = ("ran out of peers: "
1118                       "have %(have)d of %(total)d segments "
1119                       "found %(bad)d bad shares "
1120                       "encoding %(k)d-of-%(n)d")
1121             args = {"have": self._current_segment,
1122                     "total": self._num_segments,
1123                     "need": self._last_segment,
1124                     "k": self._required_shares,
1125                     "n": self._total_shares,
1126                     "bad": len(self._bad_shares)}
1127             e = NotEnoughSharesError("%s, last failure: %s" % \
1128                                      (format % args, str(self._last_failure)))
1129             f = failure.Failure(e)
1130             ret = f
1131         eventually(self._done_deferred.callback, ret)