]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/retrieve.py
b1ec761efa0f7165c224c15dbe510564417b4aa8
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / retrieve.py
1
2 import time
3 from itertools import count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from twisted.internet.interfaces import IPushProducer, IConsumer
8 from foolscap.api import eventually, fireEventually
9 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
10                                  MDMF_VERSION, SDMF_VERSION
11 from allmydata.util import hashutil, log, mathutil
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from pycryptopp.publickey import rsa
17
18 from allmydata.mutable.common import CorruptShareError, UncoordinatedWriteError
19 from allmydata.mutable.layout import MDMFSlotReadProxy
20
21 class RetrieveStatus:
22     implements(IRetrieveStatus)
23     statusid_counter = count(0)
24     def __init__(self):
25         self.timings = {}
26         self.timings["fetch_per_server"] = {}
27         self.timings["cumulative_verify"] = 0.0
28         self.problems = {}
29         self.active = True
30         self.storage_index = None
31         self.helper = False
32         self.encoding = ("?","?")
33         self.size = None
34         self.status = "Not started"
35         self.progress = 0.0
36         self.counter = self.statusid_counter.next()
37         self.started = time.time()
38
39     def get_started(self):
40         return self.started
41     def get_storage_index(self):
42         return self.storage_index
43     def get_encoding(self):
44         return self.encoding
45     def using_helper(self):
46         return self.helper
47     def get_size(self):
48         return self.size
49     def get_status(self):
50         return self.status
51     def get_progress(self):
52         return self.progress
53     def get_active(self):
54         return self.active
55     def get_counter(self):
56         return self.counter
57
58     def add_fetch_timing(self, peerid, elapsed):
59         if peerid not in self.timings["fetch_per_server"]:
60             self.timings["fetch_per_server"][peerid] = []
61         self.timings["fetch_per_server"][peerid].append(elapsed)
62     def set_storage_index(self, si):
63         self.storage_index = si
64     def set_helper(self, helper):
65         self.helper = helper
66     def set_encoding(self, k, n):
67         self.encoding = (k, n)
68     def set_size(self, size):
69         self.size = size
70     def set_status(self, status):
71         self.status = status
72     def set_progress(self, value):
73         self.progress = value
74     def set_active(self, value):
75         self.active = value
76
77 class Marker:
78     pass
79
80 class Retrieve:
81     # this class is currently single-use. Eventually (in MDMF) we will make
82     # it multi-use, in which case you can call download(range) multiple
83     # times, and each will have a separate response chain. However the
84     # Retrieve object will remain tied to a specific version of the file, and
85     # will use a single ServerMap instance.
86     implements(IPushProducer)
87
88     def __init__(self, filenode, servermap, verinfo, fetch_privkey=False,
89                  verify=False):
90         self._node = filenode
91         assert self._node.get_pubkey()
92         self._storage_index = filenode.get_storage_index()
93         assert self._node.get_readkey()
94         self._last_failure = None
95         prefix = si_b2a(self._storage_index)[:5]
96         self._log_number = log.msg("Retrieve(%s): starting" % prefix)
97         self._outstanding_queries = {} # maps (peerid,shnum) to start_time
98         self._running = True
99         self._decoding = False
100         self._bad_shares = set()
101
102         self.servermap = servermap
103         assert self._node.get_pubkey()
104         self.verinfo = verinfo
105         # during repair, we may be called upon to grab the private key, since
106         # it wasn't picked up during a verify=False checker run, and we'll
107         # need it for repair to generate a new version.
108         self._need_privkey = verify or (fetch_privkey
109                                         and not self._node.get_privkey())
110
111         if self._need_privkey:
112             # TODO: Evaluate the need for this. We'll use it if we want
113             # to limit how many queries are on the wire for the privkey
114             # at once.
115             self._privkey_query_markers = [] # one Marker for each time we've
116                                              # tried to get the privkey.
117
118         # verify means that we are using the downloader logic to verify all
119         # of our shares. This tells the downloader a few things.
120         # 
121         # 1. We need to download all of the shares.
122         # 2. We don't need to decode or decrypt the shares, since our
123         #    caller doesn't care about the plaintext, only the
124         #    information about which shares are or are not valid.
125         # 3. When we are validating readers, we need to validate the
126         #    signature on the prefix. Do we? We already do this in the
127         #    servermap update?
128         self._verify = verify
129
130         self._status = RetrieveStatus()
131         self._status.set_storage_index(self._storage_index)
132         self._status.set_helper(False)
133         self._status.set_progress(0.0)
134         self._status.set_active(True)
135         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
136          offsets_tuple) = self.verinfo
137         self._status.set_size(datalength)
138         self._status.set_encoding(k, N)
139         self.readers = {}
140         self._pause_deferred = None
141         self._offset = None
142         self._read_length = None
143         self.log("got seqnum %d" % self.verinfo[0])
144
145
146     def get_status(self):
147         return self._status
148
149     def log(self, *args, **kwargs):
150         if "parent" not in kwargs:
151             kwargs["parent"] = self._log_number
152         if "facility" not in kwargs:
153             kwargs["facility"] = "tahoe.mutable.retrieve"
154         return log.msg(*args, **kwargs)
155
156
157     ###################
158     # IPushProducer
159
160     def pauseProducing(self):
161         """
162         I am called by my download target if we have produced too much
163         data for it to handle. I make the downloader stop producing new
164         data until my resumeProducing method is called.
165         """
166         if self._pause_deferred is not None:
167             return
168
169         # fired when the download is unpaused. 
170         self._old_status = self._status.get_status()
171         self._status.set_status("Paused")
172
173         self._pause_deferred = defer.Deferred()
174
175
176     def resumeProducing(self):
177         """
178         I am called by my download target once it is ready to begin
179         receiving data again.
180         """
181         if self._pause_deferred is None:
182             return
183
184         p = self._pause_deferred
185         self._pause_deferred = None
186         self._status.set_status(self._old_status)
187
188         eventually(p.callback, None)
189
190
191     def _check_for_paused(self, res):
192         """
193         I am called just before a write to the consumer. I return a
194         Deferred that eventually fires with the data that is to be
195         written to the consumer. If the download has not been paused,
196         the Deferred fires immediately. Otherwise, the Deferred fires
197         when the downloader is unpaused.
198         """
199         if self._pause_deferred is not None:
200             d = defer.Deferred()
201             self._pause_deferred.addCallback(lambda ignored: d.callback(res))
202             return d
203         return defer.succeed(res)
204
205
206     def download(self, consumer=None, offset=0, size=None):
207         assert IConsumer.providedBy(consumer) or self._verify
208
209         if consumer:
210             self._consumer = consumer
211             # we provide IPushProducer, so streaming=True, per
212             # IConsumer.
213             self._consumer.registerProducer(self, streaming=True)
214
215         self._done_deferred = defer.Deferred()
216         self._started = time.time()
217         self._status.set_status("Retrieving Shares")
218
219         self._offset = offset
220         self._read_length = size
221
222         # first, which servers can we use?
223         versionmap = self.servermap.make_versionmap()
224         shares = versionmap[self.verinfo]
225         # this sharemap is consumed as we decide to send requests
226         self.remaining_sharemap = DictOfSets()
227         for (shnum, peerid, timestamp) in shares:
228             self.remaining_sharemap.add(shnum, peerid)
229             # If the servermap update fetched anything, it fetched at least 1
230             # KiB, so we ask for that much.
231             # TODO: Change the cache methods to allow us to fetch all of the
232             # data that they have, then change this method to do that.
233             any_cache = self._node._read_from_cache(self.verinfo, shnum,
234                                                     0, 1000)
235             ss = self.servermap.connections[peerid]
236             reader = MDMFSlotReadProxy(ss,
237                                        self._storage_index,
238                                        shnum,
239                                        any_cache)
240             reader.peerid = peerid
241             self.readers[shnum] = reader
242
243
244         self.shares = {} # maps shnum to validated blocks
245         self._active_readers = [] # list of active readers for this dl.
246         self._validated_readers = set() # set of readers that we have
247                                         # validated the prefix of
248         self._block_hash_trees = {} # shnum => hashtree
249
250         # how many shares do we need?
251         (seqnum,
252          root_hash,
253          IV,
254          segsize,
255          datalength,
256          k,
257          N,
258          prefix,
259          offsets_tuple) = self.verinfo
260
261
262         # We need one share hash tree for the entire file; its leaves
263         # are the roots of the block hash trees for the shares that
264         # comprise it, and its root is in the verinfo.
265         self.share_hash_tree = hashtree.IncompleteHashTree(N)
266         self.share_hash_tree.set_hashes({0: root_hash})
267
268         # This will set up both the segment decoder and the tail segment
269         # decoder, as well as a variety of other instance variables that
270         # the download process will use.
271         self._setup_encoding_parameters()
272         assert len(self.remaining_sharemap) >= k
273
274         self.log("starting download")
275         self._started_fetching = time.time()
276
277         self._add_active_peers()
278
279         # The download process beyond this is a state machine.
280         # _add_active_peers will select the peers that we want to use
281         # for the download, and then attempt to start downloading. After
282         # each segment, it will check for doneness, reacting to broken
283         # peers and corrupt shares as necessary. If it runs out of good
284         # peers before downloading all of the segments, _done_deferred
285         # will errback.  Otherwise, it will eventually callback with the
286         # contents of the mutable file.
287         return self._done_deferred
288
289
290     def decode(self, blocks_and_salts, segnum):
291         """
292         I am a helper method that the mutable file update process uses
293         as a shortcut to decode and decrypt the segments that it needs
294         to fetch in order to perform a file update. I take in a
295         collection of blocks and salts, and pick some of those to make a
296         segment with. I return the plaintext associated with that
297         segment.
298         """
299         # shnum => block hash tree. Unused, but setup_encoding_parameters will
300         # want to set this.
301         self._block_hash_trees = None
302         self._setup_encoding_parameters()
303
304         # This is the form expected by decode.
305         blocks_and_salts = blocks_and_salts.items()
306         blocks_and_salts = [(True, [d]) for d in blocks_and_salts]
307
308         d = self._decode_blocks(blocks_and_salts, segnum)
309         d.addCallback(self._decrypt_segment)
310         return d
311
312
313     def _setup_encoding_parameters(self):
314         """
315         I set up the encoding parameters, including k, n, the number
316         of segments associated with this file, and the segment decoder.
317         """
318         (seqnum,
319          root_hash,
320          IV,
321          segsize,
322          datalength,
323          k,
324          n,
325          known_prefix,
326          offsets_tuple) = self.verinfo
327         self._required_shares = k
328         self._total_shares = n
329         self._segment_size = segsize
330         self._data_length = datalength
331
332         if not IV:
333             self._version = MDMF_VERSION
334         else:
335             self._version = SDMF_VERSION
336
337         if datalength and segsize:
338             self._num_segments = mathutil.div_ceil(datalength, segsize)
339             self._tail_data_size = datalength % segsize
340         else:
341             self._num_segments = 0
342             self._tail_data_size = 0
343
344         self._segment_decoder = codec.CRSDecoder()
345         self._segment_decoder.set_params(segsize, k, n)
346
347         if  not self._tail_data_size:
348             self._tail_data_size = segsize
349
350         self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
351                                                          self._required_shares)
352         if self._tail_segment_size == self._segment_size:
353             self._tail_decoder = self._segment_decoder
354         else:
355             self._tail_decoder = codec.CRSDecoder()
356             self._tail_decoder.set_params(self._tail_segment_size,
357                                           self._required_shares,
358                                           self._total_shares)
359
360         self.log("got encoding parameters: "
361                  "k: %d "
362                  "n: %d "
363                  "%d segments of %d bytes each (%d byte tail segment)" % \
364                  (k, n, self._num_segments, self._segment_size,
365                   self._tail_segment_size))
366
367         if self._block_hash_trees is not None:
368             for i in xrange(self._total_shares):
369                 # So we don't have to do this later.
370                 self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
371
372         # Our last task is to tell the downloader where to start and
373         # where to stop. We use three parameters for that:
374         #   - self._start_segment: the segment that we need to start
375         #     downloading from. 
376         #   - self._current_segment: the next segment that we need to
377         #     download.
378         #   - self._last_segment: The last segment that we were asked to
379         #     download.
380         #
381         #  We say that the download is complete when
382         #  self._current_segment > self._last_segment. We use
383         #  self._start_segment and self._last_segment to know when to
384         #  strip things off of segments, and how much to strip.
385         if self._offset:
386             self.log("got offset: %d" % self._offset)
387             # our start segment is the first segment containing the
388             # offset we were given. 
389             start = self._offset // self._segment_size
390
391             assert start < self._num_segments
392             self._start_segment = start
393             self.log("got start segment: %d" % self._start_segment)
394         else:
395             self._start_segment = 0
396
397
398         if self._read_length:
399             # our end segment is the last segment containing part of the
400             # segment that we were asked to read.
401             self.log("got read length %d" % self._read_length)
402             end_data = self._offset + self._read_length
403
404             # We don't actually need to read the byte at end_data, but
405             # the one before it.
406             end = (end_data - 1) // self._segment_size
407
408             assert end < self._num_segments
409             self._last_segment = end
410             self.log("got end segment: %d" % self._last_segment)
411         else:
412             self._last_segment = self._num_segments - 1
413
414         self._current_segment = self._start_segment
415
416     def _add_active_peers(self):
417         """
418         I populate self._active_readers with enough active readers to
419         retrieve the contents of this mutable file. I am called before
420         downloading starts, and (eventually) after each validation
421         error, connection error, or other problem in the download.
422         """
423         # TODO: It would be cool to investigate other heuristics for
424         # reader selection. For instance, the cost (in time the user
425         # spends waiting for their file) of selecting a really slow peer
426         # that happens to have a primary share is probably more than
427         # selecting a really fast peer that doesn't have a primary
428         # share. Maybe the servermap could be extended to provide this
429         # information; it could keep track of latency information while
430         # it gathers more important data, and then this routine could
431         # use that to select active readers.
432         #
433         # (these and other questions would be easier to answer with a
434         #  robust, configurable tahoe-lafs simulator, which modeled node
435         #  failures, differences in node speed, and other characteristics
436         #  that we expect storage servers to have.  You could have
437         #  presets for really stable grids (like allmydata.com),
438         #  friendnets, make it easy to configure your own settings, and
439         #  then simulate the effect of big changes on these use cases
440         #  instead of just reasoning about what the effect might be. Out
441         #  of scope for MDMF, though.)
442
443         # We need at least self._required_shares readers to download a
444         # segment.
445         if self._verify:
446             needed = self._total_shares
447         else:
448             needed = self._required_shares - len(self._active_readers)
449         # XXX: Why don't format= log messages work here?
450         self.log("adding %d peers to the active peers list" % needed)
451
452         # We favor lower numbered shares, since FEC is faster with
453         # primary shares than with other shares, and lower-numbered
454         # shares are more likely to be primary than higher numbered
455         # shares.
456         active_shnums = set(sorted(self.remaining_sharemap.keys()))
457         # We shouldn't consider adding shares that we already have; this
458         # will cause problems later.
459         active_shnums -= set([reader.shnum for reader in self._active_readers])
460         active_shnums = list(active_shnums)[:needed]
461         if len(active_shnums) < needed and not self._verify:
462             # We don't have enough readers to retrieve the file; fail.
463             return self._failed()
464
465         for shnum in active_shnums:
466             self._active_readers.append(self.readers[shnum])
467             self.log("added reader for share %d" % shnum)
468         assert len(self._active_readers) >= self._required_shares
469         # Conceptually, this is part of the _add_active_peers step. It
470         # validates the prefixes of newly added readers to make sure
471         # that they match what we are expecting for self.verinfo. If
472         # validation is successful, _validate_active_prefixes will call
473         # _download_current_segment for us. If validation is
474         # unsuccessful, then _validate_prefixes will remove the peer and
475         # call _add_active_peers again, where we will attempt to rectify
476         # the problem by choosing another peer.
477         return self._validate_active_prefixes()
478
479
480     def _validate_active_prefixes(self):
481         """
482         I check to make sure that the prefixes on the peers that I am
483         currently reading from match the prefix that we want to see, as
484         said in self.verinfo.
485
486         If I find that all of the active peers have acceptable prefixes,
487         I pass control to _download_current_segment, which will use
488         those peers to do cool things. If I find that some of the active
489         peers have unacceptable prefixes, I will remove them from active
490         peers (and from further consideration) and call
491         _add_active_peers to attempt to rectify the situation. I keep
492         track of which peers I have already validated so that I don't
493         need to do so again.
494         """
495         assert self._active_readers, "No more active readers"
496
497         ds = []
498         new_readers = set(self._active_readers) - self._validated_readers
499         self.log('validating %d newly-added active readers' % len(new_readers))
500
501         for reader in new_readers:
502             # We force a remote read here -- otherwise, we are relying
503             # on cached data that we already verified as valid, and we
504             # won't detect an uncoordinated write that has occurred
505             # since the last servermap update.
506             d = reader.get_prefix(force_remote=True)
507             d.addCallback(self._try_to_validate_prefix, reader)
508             ds.append(d)
509         dl = defer.DeferredList(ds, consumeErrors=True)
510         def _check_results(results):
511             # Each result in results will be of the form (success, msg).
512             # We don't care about msg, but success will tell us whether
513             # or not the checkstring validated. If it didn't, we need to
514             # remove the offending (peer,share) from our active readers,
515             # and ensure that active readers is again populated.
516             bad_readers = []
517             for i, result in enumerate(results):
518                 if not result[0]:
519                     reader = self._active_readers[i]
520                     f = result[1]
521                     assert isinstance(f, failure.Failure)
522
523                     self.log("The reader %s failed to "
524                              "properly validate: %s" % \
525                              (reader, str(f.value)))
526                     bad_readers.append((reader, f))
527                 else:
528                     reader = self._active_readers[i]
529                     self.log("the reader %s checks out, so we'll use it" % \
530                              reader)
531                     self._validated_readers.add(reader)
532                     # Each time we validate a reader, we check to see if
533                     # we need the private key. If we do, we politely ask
534                     # for it and then continue computing. If we find
535                     # that we haven't gotten it at the end of
536                     # segment decoding, then we'll take more drastic
537                     # measures.
538                     if self._need_privkey and not self._node.is_readonly():
539                         d = reader.get_encprivkey()
540                         d.addCallback(self._try_to_validate_privkey, reader)
541             if bad_readers:
542                 # We do them all at once, or else we screw up list indexing.
543                 for (reader, f) in bad_readers:
544                     self._mark_bad_share(reader, f)
545                 if self._verify:
546                     if len(self._active_readers) >= self._required_shares:
547                         return self._download_current_segment()
548                     else:
549                         return self._failed()
550                 else:
551                     return self._add_active_peers()
552             else:
553                 return self._download_current_segment()
554             # The next step will assert that it has enough active
555             # readers to fetch shares; we just need to remove it.
556         dl.addCallback(_check_results)
557         return dl
558
559
560     def _try_to_validate_prefix(self, prefix, reader):
561         """
562         I check that the prefix returned by a candidate server for
563         retrieval matches the prefix that the servermap knows about
564         (and, hence, the prefix that was validated earlier). If it does,
565         I return True, which means that I approve of the use of the
566         candidate server for segment retrieval. If it doesn't, I return
567         False, which means that another server must be chosen.
568         """
569         (seqnum,
570          root_hash,
571          IV,
572          segsize,
573          datalength,
574          k,
575          N,
576          known_prefix,
577          offsets_tuple) = self.verinfo
578         if known_prefix != prefix:
579             self.log("prefix from share %d doesn't match" % reader.shnum)
580             raise UncoordinatedWriteError("Mismatched prefix -- this could "
581                                           "indicate an uncoordinated write")
582         # Otherwise, we're okay -- no issues.
583
584
585     def _remove_reader(self, reader):
586         """
587         At various points, we will wish to remove a peer from
588         consideration and/or use. These include, but are not necessarily
589         limited to:
590
591             - A connection error.
592             - A mismatched prefix (that is, a prefix that does not match
593               our conception of the version information string).
594             - A failing block hash, salt hash, or share hash, which can
595               indicate disk failure/bit flips, or network trouble.
596
597         This method will do that. I will make sure that the
598         (shnum,reader) combination represented by my reader argument is
599         not used for anything else during this download. I will not
600         advise the reader of any corruption, something that my callers
601         may wish to do on their own.
602         """
603         # TODO: When you're done writing this, see if this is ever
604         # actually used for something that _mark_bad_share isn't. I have
605         # a feeling that they will be used for very similar things, and
606         # that having them both here is just going to be an epic amount
607         # of code duplication.
608         #
609         # (well, okay, not epic, but meaningful)
610         self.log("removing reader %s" % reader)
611         # Remove the reader from _active_readers
612         self._active_readers.remove(reader)
613         # TODO: self.readers.remove(reader)?
614         for shnum in list(self.remaining_sharemap.keys()):
615             self.remaining_sharemap.discard(shnum, reader.peerid)
616
617
618     def _mark_bad_share(self, reader, f):
619         """
620         I mark the (peerid, shnum) encapsulated by my reader argument as
621         a bad share, which means that it will not be used anywhere else.
622
623         There are several reasons to want to mark something as a bad
624         share. These include:
625
626             - A connection error to the peer.
627             - A mismatched prefix (that is, a prefix that does not match
628               our local conception of the version information string).
629             - A failing block hash, salt hash, share hash, or other
630               integrity check.
631
632         This method will ensure that readers that we wish to mark bad
633         (for these reasons or other reasons) are not used for the rest
634         of the download. Additionally, it will attempt to tell the
635         remote peer (with no guarantee of success) that its share is
636         corrupt.
637         """
638         self.log("marking share %d on server %s as bad" % \
639                  (reader.shnum, reader))
640         prefix = self.verinfo[-2]
641         self.servermap.mark_bad_share(reader.peerid,
642                                       reader.shnum,
643                                       prefix)
644         self._remove_reader(reader)
645         self._bad_shares.add((reader.peerid, reader.shnum, f))
646         self._status.problems[reader.peerid] = f
647         self._last_failure = f
648         self.notify_server_corruption(reader.peerid, reader.shnum,
649                                       str(f.value))
650
651
652     def _download_current_segment(self):
653         """
654         I download, validate, decode, decrypt, and assemble the segment
655         that this Retrieve is currently responsible for downloading.
656         """
657         assert len(self._active_readers) >= self._required_shares
658         if self._current_segment <= self._last_segment:
659             d = self._process_segment(self._current_segment)
660         else:
661             d = defer.succeed(None)
662         d.addBoth(self._turn_barrier)
663         d.addCallback(self._check_for_done)
664         return d
665
666
667     def _turn_barrier(self, result):
668         """
669         I help the download process avoid the recursion limit issues
670         discussed in #237.
671         """
672         return fireEventually(result)
673
674
675     def _process_segment(self, segnum):
676         """
677         I download, validate, decode, and decrypt one segment of the
678         file that this Retrieve is retrieving. This means coordinating
679         the process of getting k blocks of that file, validating them,
680         assembling them into one segment with the decoder, and then
681         decrypting them.
682         """
683         self.log("processing segment %d" % segnum)
684
685         # TODO: The old code uses a marker. Should this code do that
686         # too? What did the Marker do?
687         assert len(self._active_readers) >= self._required_shares
688
689         # We need to ask each of our active readers for its block and
690         # salt. We will then validate those. If validation is
691         # successful, we will assemble the results into plaintext.
692         ds = []
693         for reader in self._active_readers:
694             started = time.time()
695             d = reader.get_block_and_salt(segnum, queue=True)
696             d2 = self._get_needed_hashes(reader, segnum)
697             dl = defer.DeferredList([d, d2], consumeErrors=True)
698             dl.addCallback(self._validate_block, segnum, reader, started)
699             dl.addErrback(self._validation_or_decoding_failed, [reader])
700             ds.append(dl)
701             reader.flush()
702         dl = defer.DeferredList(ds)
703         if self._verify:
704             dl.addCallback(lambda ignored: "")
705             dl.addCallback(self._set_segment)
706         else:
707             dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
708         return dl
709
710
711     def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum):
712         """
713         I take the results of fetching and validating the blocks from a
714         callback chain in another method. If the results are such that
715         they tell me that validation and fetching succeeded without
716         incident, I will proceed with decoding and decryption.
717         Otherwise, I will do nothing.
718         """
719         self.log("trying to decode and decrypt segment %d" % segnum)
720         failures = False
721         for block_and_salt in blocks_and_salts:
722             if not block_and_salt[0] or block_and_salt[1] == None:
723                 self.log("some validation operations failed; not proceeding")
724                 failures = True
725                 break
726         if not failures:
727             self.log("everything looks ok, building segment %d" % segnum)
728             d = self._decode_blocks(blocks_and_salts, segnum)
729             d.addCallback(self._decrypt_segment)
730             d.addErrback(self._validation_or_decoding_failed,
731                          self._active_readers)
732             # check to see whether we've been paused before writing
733             # anything.
734             d.addCallback(self._check_for_paused)
735             d.addCallback(self._set_segment)
736             return d
737         else:
738             return defer.succeed(None)
739
740
741     def _set_segment(self, segment):
742         """
743         Given a plaintext segment, I register that segment with the
744         target that is handling the file download.
745         """
746         self.log("got plaintext for segment %d" % self._current_segment)
747         if self._current_segment == self._start_segment:
748             # We're on the first segment. It's possible that we want
749             # only some part of the end of this segment, and that we
750             # just downloaded the whole thing to get that part. If so,
751             # we need to account for that and give the reader just the
752             # data that they want.
753             n = self._offset % self._segment_size
754             self.log("stripping %d bytes off of the first segment" % n)
755             self.log("original segment length: %d" % len(segment))
756             segment = segment[n:]
757             self.log("new segment length: %d" % len(segment))
758
759         if self._current_segment == self._last_segment and self._read_length is not None:
760             # We're on the last segment. It's possible that we only want
761             # part of the beginning of this segment, and that we
762             # downloaded the whole thing anyway. Make sure to give the
763             # caller only the portion of the segment that they want to
764             # receive.
765             extra = self._read_length
766             if self._start_segment != self._last_segment:
767                 extra -= self._segment_size - \
768                             (self._offset % self._segment_size)
769             extra %= self._segment_size
770             self.log("original segment length: %d" % len(segment))
771             segment = segment[:extra]
772             self.log("new segment length: %d" % len(segment))
773             self.log("only taking %d bytes of the last segment" % extra)
774
775         if not self._verify:
776             self._consumer.write(segment)
777         else:
778             # we don't care about the plaintext if we are doing a verify.
779             segment = None
780         self._current_segment += 1
781
782
783     def _validation_or_decoding_failed(self, f, readers):
784         """
785         I am called when a block or a salt fails to correctly validate, or when
786         the decryption or decoding operation fails for some reason.  I react to
787         this failure by notifying the remote server of corruption, and then
788         removing the remote peer from further activity.
789         """
790         assert isinstance(readers, list)
791         bad_shnums = [reader.shnum for reader in readers]
792
793         self.log("validation or decoding failed on share(s) %s, peer(s) %s "
794                  ", segment %d: %s" % \
795                  (bad_shnums, readers, self._current_segment, str(f)))
796         for reader in readers:
797             self._mark_bad_share(reader, f)
798         return
799
800
801     def _validate_block(self, results, segnum, reader, started):
802         """
803         I validate a block from one share on a remote server.
804         """
805         # Grab the part of the block hash tree that is necessary to
806         # validate this block, then generate the block hash root.
807         self.log("validating share %d for segment %d" % (reader.shnum,
808                                                              segnum))
809         self._status.add_fetch_timing(reader.peerid, started)
810         self._status.set_status("Valdiating blocks for segment %d" % segnum)
811         # Did we fail to fetch either of the things that we were
812         # supposed to? Fail if so.
813         if not results[0][0] and results[1][0]:
814             # handled by the errback handler.
815
816             # These all get batched into one query, so the resulting
817             # failure should be the same for all of them, so we can just
818             # use the first one.
819             assert isinstance(results[0][1], failure.Failure)
820
821             f = results[0][1]
822             raise CorruptShareError(reader.peerid,
823                                     reader.shnum,
824                                     "Connection error: %s" % str(f))
825
826         block_and_salt, block_and_sharehashes = results
827         block, salt = block_and_salt[1]
828         blockhashes, sharehashes = block_and_sharehashes[1]
829
830         blockhashes = dict(enumerate(blockhashes[1]))
831         self.log("the reader gave me the following blockhashes: %s" % \
832                  blockhashes.keys())
833         self.log("the reader gave me the following sharehashes: %s" % \
834                  sharehashes[1].keys())
835         bht = self._block_hash_trees[reader.shnum]
836
837         if bht.needed_hashes(segnum, include_leaf=True):
838             try:
839                 bht.set_hashes(blockhashes)
840             except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
841                     IndexError), e:
842                 raise CorruptShareError(reader.peerid,
843                                         reader.shnum,
844                                         "block hash tree failure: %s" % e)
845
846         if self._version == MDMF_VERSION:
847             blockhash = hashutil.block_hash(salt + block)
848         else:
849             blockhash = hashutil.block_hash(block)
850         # If this works without an error, then validation is
851         # successful.
852         try:
853            bht.set_hashes(leaves={segnum: blockhash})
854         except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
855                 IndexError), e:
856             raise CorruptShareError(reader.peerid,
857                                     reader.shnum,
858                                     "block hash tree failure: %s" % e)
859
860         # Reaching this point means that we know that this segment
861         # is correct. Now we need to check to see whether the share
862         # hash chain is also correct. 
863         # SDMF wrote share hash chains that didn't contain the
864         # leaves, which would be produced from the block hash tree.
865         # So we need to validate the block hash tree first. If
866         # successful, then bht[0] will contain the root for the
867         # shnum, which will be a leaf in the share hash tree, which
868         # will allow us to validate the rest of the tree.
869         if self.share_hash_tree.needed_hashes(reader.shnum,
870                                               include_leaf=True) or \
871                                               self._verify:
872             try:
873                 self.share_hash_tree.set_hashes(hashes=sharehashes[1],
874                                             leaves={reader.shnum: bht[0]})
875             except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
876                     IndexError), e:
877                 raise CorruptShareError(reader.peerid,
878                                         reader.shnum,
879                                         "corrupt hashes: %s" % e)
880
881         self.log('share %d is valid for segment %d' % (reader.shnum,
882                                                        segnum))
883         return {reader.shnum: (block, salt)}
884
885
886     def _get_needed_hashes(self, reader, segnum):
887         """
888         I get the hashes needed to validate segnum from the reader, then return
889         to my caller when this is done.
890         """
891         bht = self._block_hash_trees[reader.shnum]
892         needed = bht.needed_hashes(segnum, include_leaf=True)
893         # The root of the block hash tree is also a leaf in the share
894         # hash tree. So we don't need to fetch it from the remote
895         # server. In the case of files with one segment, this means that
896         # we won't fetch any block hash tree from the remote server,
897         # since the hash of each share of the file is the entire block
898         # hash tree, and is a leaf in the share hash tree. This is fine,
899         # since any share corruption will be detected in the share hash
900         # tree.
901         #needed.discard(0)
902         self.log("getting blockhashes for segment %d, share %d: %s" % \
903                  (segnum, reader.shnum, str(needed)))
904         d1 = reader.get_blockhashes(needed, queue=True, force_remote=True)
905         if self.share_hash_tree.needed_hashes(reader.shnum):
906             need = self.share_hash_tree.needed_hashes(reader.shnum)
907             self.log("also need sharehashes for share %d: %s" % (reader.shnum,
908                                                                  str(need)))
909             d2 = reader.get_sharehashes(need, queue=True, force_remote=True)
910         else:
911             d2 = defer.succeed({}) # the logic in the next method
912                                    # expects a dict
913         dl = defer.DeferredList([d1, d2], consumeErrors=True)
914         return dl
915
916
917     def _decode_blocks(self, blocks_and_salts, segnum):
918         """
919         I take a list of k blocks and salts, and decode that into a
920         single encrypted segment.
921         """
922         d = {}
923         # We want to merge our dictionaries to the form 
924         # {shnum: blocks_and_salts}
925         #
926         # The dictionaries come from validate block that way, so we just
927         # need to merge them.
928         for block_and_salt in blocks_and_salts:
929             d.update(block_and_salt[1])
930
931         # All of these blocks should have the same salt; in SDMF, it is
932         # the file-wide IV, while in MDMF it is the per-segment salt. In
933         # either case, we just need to get one of them and use it.
934         #
935         # d.items()[0] is like (shnum, (block, salt))
936         # d.items()[0][1] is like (block, salt)
937         # d.items()[0][1][1] is the salt.
938         salt = d.items()[0][1][1]
939         # Next, extract just the blocks from the dict. We'll use the
940         # salt in the next step.
941         share_and_shareids = [(k, v[0]) for k, v in d.items()]
942         d2 = dict(share_and_shareids)
943         shareids = []
944         shares = []
945         for shareid, share in d2.items():
946             shareids.append(shareid)
947             shares.append(share)
948
949         self._status.set_status("Decoding")
950         started = time.time()
951         assert len(shareids) >= self._required_shares, len(shareids)
952         # zfec really doesn't want extra shares
953         shareids = shareids[:self._required_shares]
954         shares = shares[:self._required_shares]
955         self.log("decoding segment %d" % segnum)
956         if segnum == self._num_segments - 1:
957             d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
958         else:
959             d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
960         def _process(buffers):
961             segment = "".join(buffers)
962             self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
963                      segnum=segnum,
964                      numsegs=self._num_segments,
965                      level=log.NOISY)
966             self.log(" joined length %d, datalength %d" %
967                      (len(segment), self._data_length))
968             if segnum == self._num_segments - 1:
969                 size_to_use = self._tail_data_size
970             else:
971                 size_to_use = self._segment_size
972             segment = segment[:size_to_use]
973             self.log(" segment len=%d" % len(segment))
974             self._status.timings.setdefault("decode", 0)
975             self._status.timings['decode'] = time.time() - started
976             return segment, salt
977         d.addCallback(_process)
978         return d
979
980
981     def _decrypt_segment(self, segment_and_salt):
982         """
983         I take a single segment and its salt, and decrypt it. I return
984         the plaintext of the segment that is in my argument.
985         """
986         segment, salt = segment_and_salt
987         self._status.set_status("decrypting")
988         self.log("decrypting segment %d" % self._current_segment)
989         started = time.time()
990         key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
991         decryptor = AES(key)
992         plaintext = decryptor.process(segment)
993         self._status.timings.setdefault("decrypt", 0)
994         self._status.timings['decrypt'] = time.time() - started
995         return plaintext
996
997
998     def notify_server_corruption(self, peerid, shnum, reason):
999         ss = self.servermap.connections[peerid]
1000         ss.callRemoteOnly("advise_corrupt_share",
1001                           "mutable", self._storage_index, shnum, reason)
1002
1003
1004     def _try_to_validate_privkey(self, enc_privkey, reader):
1005         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
1006         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
1007         if alleged_writekey != self._node.get_writekey():
1008             self.log("invalid privkey from %s shnum %d" %
1009                      (reader, reader.shnum),
1010                      level=log.WEIRD, umid="YIw4tA")
1011             if self._verify:
1012                 self.servermap.mark_bad_share(reader.peerid, reader.shnum,
1013                                               self.verinfo[-2])
1014                 e = CorruptShareError(reader.peerid,
1015                                       reader.shnum,
1016                                       "invalid privkey")
1017                 f = failure.Failure(e)
1018                 self._bad_shares.add((reader.peerid, reader.shnum, f))
1019             return
1020
1021         # it's good
1022         self.log("got valid privkey from shnum %d on reader %s" %
1023                  (reader.shnum, reader))
1024         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
1025         self._node._populate_encprivkey(enc_privkey)
1026         self._node._populate_privkey(privkey)
1027         self._need_privkey = False
1028
1029
1030     def _check_for_done(self, res):
1031         """
1032         I check to see if this Retrieve object has successfully finished
1033         its work.
1034
1035         I can exit in the following ways:
1036             - If there are no more segments to download, then I exit by
1037               causing self._done_deferred to fire with the plaintext
1038               content requested by the caller.
1039             - If there are still segments to be downloaded, and there
1040               are enough active readers (readers which have not broken
1041               and have not given us corrupt data) to continue
1042               downloading, I send control back to
1043               _download_current_segment.
1044             - If there are still segments to be downloaded but there are
1045               not enough active peers to download them, I ask
1046               _add_active_peers to add more peers. If it is successful,
1047               it will call _download_current_segment. If there are not
1048               enough peers to retrieve the file, then that will cause
1049               _done_deferred to errback.
1050         """
1051         self.log("checking for doneness")
1052         if self._current_segment > self._last_segment:
1053             # No more segments to download, we're done.
1054             self.log("got plaintext, done")
1055             return self._done()
1056
1057         if len(self._active_readers) >= self._required_shares:
1058             # More segments to download, but we have enough good peers
1059             # in self._active_readers that we can do that without issue,
1060             # so go nab the next segment.
1061             self.log("not done yet: on segment %d of %d" % \
1062                      (self._current_segment + 1, self._num_segments))
1063             return self._download_current_segment()
1064
1065         self.log("not done yet: on segment %d of %d, need to add peers" % \
1066                  (self._current_segment + 1, self._num_segments))
1067         return self._add_active_peers()
1068
1069
1070     def _done(self):
1071         """
1072         I am called by _check_for_done when the download process has
1073         finished successfully. After making some useful logging
1074         statements, I return the decrypted contents to the owner of this
1075         Retrieve object through self._done_deferred.
1076         """
1077         self._running = False
1078         self._status.set_active(False)
1079         now = time.time()
1080         self._status.timings['total'] = now - self._started
1081         self._status.timings['fetch'] = now - self._started_fetching
1082
1083         # remember the encoding parameters, use them again next time
1084         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
1085          offsets_tuple) = self.verinfo
1086         self._node._populate_required_shares(k)
1087         self._node._populate_total_shares(N)
1088
1089         if self._verify:
1090             ret = list(self._bad_shares)
1091             self.log("done verifying, found %d bad shares" % len(ret))
1092         else:
1093             # TODO: upload status here?
1094             ret = self._consumer
1095             self._consumer.unregisterProducer()
1096         eventually(self._done_deferred.callback, ret)
1097
1098
1099     def _failed(self):
1100         """
1101         I am called by _add_active_peers when there are not enough
1102         active peers left to complete the download. After making some
1103         useful logging statements, I return an exception to that effect
1104         to the caller of this Retrieve object through
1105         self._done_deferred.
1106         """
1107         self._running = False
1108         self._status.set_active(False)
1109         now = time.time()
1110         self._status.timings['total'] = now - self._started
1111         self._status.timings['fetch'] = now - self._started_fetching
1112
1113         if self._verify:
1114             ret = list(self._bad_shares)
1115         else:
1116             format = ("ran out of peers: "
1117                       "have %(have)d of %(total)d segments "
1118                       "found %(bad)d bad shares "
1119                       "encoding %(k)d-of-%(n)d")
1120             args = {"have": self._current_segment,
1121                     "total": self._num_segments,
1122                     "need": self._last_segment,
1123                     "k": self._required_shares,
1124                     "n": self._total_shares,
1125                     "bad": len(self._bad_shares)}
1126             e = NotEnoughSharesError("%s, last failure: %s" % \
1127                                      (format % args, str(self._last_failure)))
1128             f = failure.Failure(e)
1129             ret = f
1130         eventually(self._done_deferred.callback, ret)