]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/retrieve.py
mutable/retrieve.py: use floor division to calculate segment boundaries, don't fetch...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / retrieve.py
1
2 import time
3 from itertools import count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from twisted.internet.interfaces import IPushProducer, IConsumer
8 from foolscap.api import eventually, fireEventually
9 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
10                                  MDMF_VERSION, SDMF_VERSION
11 from allmydata.util import hashutil, log, mathutil
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from pycryptopp.publickey import rsa
17
18 from allmydata.mutable.common import CorruptShareError, UncoordinatedWriteError
19 from allmydata.mutable.layout import MDMFSlotReadProxy
20
21 class RetrieveStatus:
22     implements(IRetrieveStatus)
23     statusid_counter = count(0)
24     def __init__(self):
25         self.timings = {}
26         self.timings["fetch_per_server"] = {}
27         self.timings["cumulative_verify"] = 0.0
28         self.problems = {}
29         self.active = True
30         self.storage_index = None
31         self.helper = False
32         self.encoding = ("?","?")
33         self.size = None
34         self.status = "Not started"
35         self.progress = 0.0
36         self.counter = self.statusid_counter.next()
37         self.started = time.time()
38
39     def get_started(self):
40         return self.started
41     def get_storage_index(self):
42         return self.storage_index
43     def get_encoding(self):
44         return self.encoding
45     def using_helper(self):
46         return self.helper
47     def get_size(self):
48         return self.size
49     def get_status(self):
50         return self.status
51     def get_progress(self):
52         return self.progress
53     def get_active(self):
54         return self.active
55     def get_counter(self):
56         return self.counter
57
58     def add_fetch_timing(self, peerid, elapsed):
59         if peerid not in self.timings["fetch_per_server"]:
60             self.timings["fetch_per_server"][peerid] = []
61         self.timings["fetch_per_server"][peerid].append(elapsed)
62     def set_storage_index(self, si):
63         self.storage_index = si
64     def set_helper(self, helper):
65         self.helper = helper
66     def set_encoding(self, k, n):
67         self.encoding = (k, n)
68     def set_size(self, size):
69         self.size = size
70     def set_status(self, status):
71         self.status = status
72     def set_progress(self, value):
73         self.progress = value
74     def set_active(self, value):
75         self.active = value
76
77 class Marker:
78     pass
79
80 class Retrieve:
81     # this class is currently single-use. Eventually (in MDMF) we will make
82     # it multi-use, in which case you can call download(range) multiple
83     # times, and each will have a separate response chain. However the
84     # Retrieve object will remain tied to a specific version of the file, and
85     # will use a single ServerMap instance.
86     implements(IPushProducer)
87
88     def __init__(self, filenode, servermap, verinfo, fetch_privkey=False,
89                  verify=False):
90         self._node = filenode
91         assert self._node.get_pubkey()
92         self._storage_index = filenode.get_storage_index()
93         assert self._node.get_readkey()
94         self._last_failure = None
95         prefix = si_b2a(self._storage_index)[:5]
96         self._log_number = log.msg("Retrieve(%s): starting" % prefix)
97         self._outstanding_queries = {} # maps (peerid,shnum) to start_time
98         self._running = True
99         self._decoding = False
100         self._bad_shares = set()
101
102         self.servermap = servermap
103         assert self._node.get_pubkey()
104         self.verinfo = verinfo
105         # during repair, we may be called upon to grab the private key, since
106         # it wasn't picked up during a verify=False checker run, and we'll
107         # need it for repair to generate a new version.
108         self._need_privkey = verify or (fetch_privkey
109                                         and not self._node.get_privkey())
110
111         if self._need_privkey:
112             # TODO: Evaluate the need for this. We'll use it if we want
113             # to limit how many queries are on the wire for the privkey
114             # at once.
115             self._privkey_query_markers = [] # one Marker for each time we've
116                                              # tried to get the privkey.
117
118         # verify means that we are using the downloader logic to verify all
119         # of our shares. This tells the downloader a few things.
120         # 
121         # 1. We need to download all of the shares.
122         # 2. We don't need to decode or decrypt the shares, since our
123         #    caller doesn't care about the plaintext, only the
124         #    information about which shares are or are not valid.
125         # 3. When we are validating readers, we need to validate the
126         #    signature on the prefix. Do we? We already do this in the
127         #    servermap update?
128         self._verify = verify
129
130         self._status = RetrieveStatus()
131         self._status.set_storage_index(self._storage_index)
132         self._status.set_helper(False)
133         self._status.set_progress(0.0)
134         self._status.set_active(True)
135         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
136          offsets_tuple) = self.verinfo
137         self._status.set_size(datalength)
138         self._status.set_encoding(k, N)
139         self.readers = {}
140         self._pause_deferred = None
141         self._offset = None
142         self._read_length = None
143         self.log("got seqnum %d" % self.verinfo[0])
144
145
146     def get_status(self):
147         return self._status
148
149     def log(self, *args, **kwargs):
150         if "parent" not in kwargs:
151             kwargs["parent"] = self._log_number
152         if "facility" not in kwargs:
153             kwargs["facility"] = "tahoe.mutable.retrieve"
154         return log.msg(*args, **kwargs)
155
156
157     ###################
158     # IPushProducer
159
160     def pauseProducing(self):
161         """
162         I am called by my download target if we have produced too much
163         data for it to handle. I make the downloader stop producing new
164         data until my resumeProducing method is called.
165         """
166         if self._pause_deferred is not None:
167             return
168
169         # fired when the download is unpaused. 
170         self._old_status = self._status.get_status()
171         self._status.set_status("Paused")
172
173         self._pause_deferred = defer.Deferred()
174
175
176     def resumeProducing(self):
177         """
178         I am called by my download target once it is ready to begin
179         receiving data again.
180         """
181         if self._pause_deferred is None:
182             return
183
184         p = self._pause_deferred
185         self._pause_deferred = None
186         self._status.set_status(self._old_status)
187
188         eventually(p.callback, None)
189
190
191     def _check_for_paused(self, res):
192         """
193         I am called just before a write to the consumer. I return a
194         Deferred that eventually fires with the data that is to be
195         written to the consumer. If the download has not been paused,
196         the Deferred fires immediately. Otherwise, the Deferred fires
197         when the downloader is unpaused.
198         """
199         if self._pause_deferred is not None:
200             d = defer.Deferred()
201             self._pause_deferred.addCallback(lambda ignored: d.callback(res))
202             return d
203         return defer.succeed(res)
204
205
206     def download(self, consumer=None, offset=0, size=None):
207         assert IConsumer.providedBy(consumer) or self._verify
208
209         if consumer:
210             self._consumer = consumer
211             # we provide IPushProducer, so streaming=True, per
212             # IConsumer.
213             self._consumer.registerProducer(self, streaming=True)
214
215         self._done_deferred = defer.Deferred()
216         self._started = time.time()
217         self._status.set_status("Retrieving Shares")
218
219         self._offset = offset
220         self._read_length = size
221
222         # first, which servers can we use?
223         versionmap = self.servermap.make_versionmap()
224         shares = versionmap[self.verinfo]
225         # this sharemap is consumed as we decide to send requests
226         self.remaining_sharemap = DictOfSets()
227         for (shnum, peerid, timestamp) in shares:
228             self.remaining_sharemap.add(shnum, peerid)
229             # If the servermap update fetched anything, it fetched at least 1
230             # KiB, so we ask for that much.
231             # TODO: Change the cache methods to allow us to fetch all of the
232             # data that they have, then change this method to do that.
233             any_cache = self._node._read_from_cache(self.verinfo, shnum,
234                                                     0, 1000)
235             ss = self.servermap.connections[peerid]
236             reader = MDMFSlotReadProxy(ss,
237                                        self._storage_index,
238                                        shnum,
239                                        any_cache)
240             reader.peerid = peerid
241             self.readers[shnum] = reader
242
243
244         self.shares = {} # maps shnum to validated blocks
245         self._active_readers = [] # list of active readers for this dl.
246         self._validated_readers = set() # set of readers that we have
247                                         # validated the prefix of
248         self._block_hash_trees = {} # shnum => hashtree
249
250         # how many shares do we need?
251         (seqnum,
252          root_hash,
253          IV,
254          segsize,
255          datalength,
256          k,
257          N,
258          prefix,
259          offsets_tuple) = self.verinfo
260
261
262         # We need one share hash tree for the entire file; its leaves
263         # are the roots of the block hash trees for the shares that
264         # comprise it, and its root is in the verinfo.
265         self.share_hash_tree = hashtree.IncompleteHashTree(N)
266         self.share_hash_tree.set_hashes({0: root_hash})
267
268         # This will set up both the segment decoder and the tail segment
269         # decoder, as well as a variety of other instance variables that
270         # the download process will use.
271         self._setup_encoding_parameters()
272         assert len(self.remaining_sharemap) >= k
273
274         self.log("starting download")
275         self._started_fetching = time.time()
276
277         self._add_active_peers()
278         # The download process beyond this is a state machine.
279         # _add_active_peers will select the peers that we want to use
280         # for the download, and then attempt to start downloading. After
281         # each segment, it will check for doneness, reacting to broken
282         # peers and corrupt shares as necessary. If it runs out of good
283         # peers before downloading all of the segments, _done_deferred
284         # will errback.  Otherwise, it will eventually callback with the
285         # contents of the mutable file.
286         return self._done_deferred
287
288
289     def decode(self, blocks_and_salts, segnum):
290         """
291         I am a helper method that the mutable file update process uses
292         as a shortcut to decode and decrypt the segments that it needs
293         to fetch in order to perform a file update. I take in a
294         collection of blocks and salts, and pick some of those to make a
295         segment with. I return the plaintext associated with that
296         segment.
297         """
298         # shnum => block hash tree. Unused, but setup_encoding_parameters will
299         # want to set this.
300         # XXX: Make it so that it won't set this if we're just decoding.
301         self._block_hash_trees = None
302         self._setup_encoding_parameters()
303         # This is the form expected by decode.
304         blocks_and_salts = blocks_and_salts.items()
305         blocks_and_salts = [(True, [d]) for d in blocks_and_salts]
306
307         d = self._decode_blocks(blocks_and_salts, segnum)
308         d.addCallback(self._decrypt_segment)
309         return d
310
311
312     def _setup_encoding_parameters(self):
313         """
314         I set up the encoding parameters, including k, n, the number
315         of segments associated with this file, and the segment decoder.
316         """
317         (seqnum,
318          root_hash,
319          IV,
320          segsize,
321          datalength,
322          k,
323          n,
324          known_prefix,
325          offsets_tuple) = self.verinfo
326         self._required_shares = k
327         self._total_shares = n
328         self._segment_size = segsize
329         self._data_length = datalength
330
331         if not IV:
332             self._version = MDMF_VERSION
333         else:
334             self._version = SDMF_VERSION
335
336         if datalength and segsize:
337             self._num_segments = mathutil.div_ceil(datalength, segsize)
338             self._tail_data_size = datalength % segsize
339         else:
340             self._num_segments = 0
341             self._tail_data_size = 0
342
343         self._segment_decoder = codec.CRSDecoder()
344         self._segment_decoder.set_params(segsize, k, n)
345
346         if  not self._tail_data_size:
347             self._tail_data_size = segsize
348
349         self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
350                                                          self._required_shares)
351         if self._tail_segment_size == self._segment_size:
352             self._tail_decoder = self._segment_decoder
353         else:
354             self._tail_decoder = codec.CRSDecoder()
355             self._tail_decoder.set_params(self._tail_segment_size,
356                                           self._required_shares,
357                                           self._total_shares)
358
359         self.log("got encoding parameters: "
360                  "k: %d "
361                  "n: %d "
362                  "%d segments of %d bytes each (%d byte tail segment)" % \
363                  (k, n, self._num_segments, self._segment_size,
364                   self._tail_segment_size))
365
366         if self._block_hash_trees is not None:
367             for i in xrange(self._total_shares):
368                 # So we don't have to do this later.
369                 self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
370
371         # Our last task is to tell the downloader where to start and
372         # where to stop. We use three parameters for that:
373         #   - self._start_segment: the segment that we need to start
374         #     downloading from. 
375         #   - self._current_segment: the next segment that we need to
376         #     download.
377         #   - self._last_segment: The last segment that we were asked to
378         #     download.
379         #
380         #  We say that the download is complete when
381         #  self._current_segment > self._last_segment. We use
382         #  self._start_segment and self._last_segment to know when to
383         #  strip things off of segments, and how much to strip.
384         if self._offset:
385             self.log("got offset: %d" % self._offset)
386             # our start segment is the first segment containing the
387             # offset we were given. 
388             start = self._offset // self._segment_size
389
390             assert start < self._num_segments
391             self._start_segment = start
392             self.log("got start segment: %d" % self._start_segment)
393         else:
394             self._start_segment = 0
395
396
397         if self._read_length:
398             # our end segment is the last segment containing part of the
399             # segment that we were asked to read.
400             self.log("got read length %d" % self._read_length)
401             end_data = self._offset + self._read_length
402             # We don't actually need to read the byte at end_data, but
403             # the one before it.
404             end_data = end_data - 1
405             end = end_data // self._segment_size
406
407             assert end < self._num_segments
408             self._last_segment = end
409             self.log("got end segment: %d" % self._last_segment)
410         else:
411             self._last_segment = self._num_segments - 1
412
413         self._current_segment = self._start_segment
414
415     def _add_active_peers(self):
416         """
417         I populate self._active_readers with enough active readers to
418         retrieve the contents of this mutable file. I am called before
419         downloading starts, and (eventually) after each validation
420         error, connection error, or other problem in the download.
421         """
422         # TODO: It would be cool to investigate other heuristics for
423         # reader selection. For instance, the cost (in time the user
424         # spends waiting for their file) of selecting a really slow peer
425         # that happens to have a primary share is probably more than
426         # selecting a really fast peer that doesn't have a primary
427         # share. Maybe the servermap could be extended to provide this
428         # information; it could keep track of latency information while
429         # it gathers more important data, and then this routine could
430         # use that to select active readers.
431         #
432         # (these and other questions would be easier to answer with a
433         #  robust, configurable tahoe-lafs simulator, which modeled node
434         #  failures, differences in node speed, and other characteristics
435         #  that we expect storage servers to have.  You could have
436         #  presets for really stable grids (like allmydata.com),
437         #  friendnets, make it easy to configure your own settings, and
438         #  then simulate the effect of big changes on these use cases
439         #  instead of just reasoning about what the effect might be. Out
440         #  of scope for MDMF, though.)
441
442         # We need at least self._required_shares readers to download a
443         # segment.
444         if self._verify:
445             needed = self._total_shares
446         else:
447             needed = self._required_shares - len(self._active_readers)
448         # XXX: Why don't format= log messages work here?
449         self.log("adding %d peers to the active peers list" % needed)
450
451         # We favor lower numbered shares, since FEC is faster with
452         # primary shares than with other shares, and lower-numbered
453         # shares are more likely to be primary than higher numbered
454         # shares.
455         active_shnums = set(sorted(self.remaining_sharemap.keys()))
456         # We shouldn't consider adding shares that we already have; this
457         # will cause problems later.
458         active_shnums -= set([reader.shnum for reader in self._active_readers])
459         active_shnums = list(active_shnums)[:needed]
460         if len(active_shnums) < needed and not self._verify:
461             # We don't have enough readers to retrieve the file; fail.
462             return self._failed()
463
464         for shnum in active_shnums:
465             self._active_readers.append(self.readers[shnum])
466             self.log("added reader for share %d" % shnum)
467         assert len(self._active_readers) >= self._required_shares
468         # Conceptually, this is part of the _add_active_peers step. It
469         # validates the prefixes of newly added readers to make sure
470         # that they match what we are expecting for self.verinfo. If
471         # validation is successful, _validate_active_prefixes will call
472         # _download_current_segment for us. If validation is
473         # unsuccessful, then _validate_prefixes will remove the peer and
474         # call _add_active_peers again, where we will attempt to rectify
475         # the problem by choosing another peer.
476         return self._validate_active_prefixes()
477
478
479     def _validate_active_prefixes(self):
480         """
481         I check to make sure that the prefixes on the peers that I am
482         currently reading from match the prefix that we want to see, as
483         said in self.verinfo.
484
485         If I find that all of the active peers have acceptable prefixes,
486         I pass control to _download_current_segment, which will use
487         those peers to do cool things. If I find that some of the active
488         peers have unacceptable prefixes, I will remove them from active
489         peers (and from further consideration) and call
490         _add_active_peers to attempt to rectify the situation. I keep
491         track of which peers I have already validated so that I don't
492         need to do so again.
493         """
494         assert self._active_readers, "No more active readers"
495
496         ds = []
497         new_readers = set(self._active_readers) - self._validated_readers
498         self.log('validating %d newly-added active readers' % len(new_readers))
499
500         for reader in new_readers:
501             # We force a remote read here -- otherwise, we are relying
502             # on cached data that we already verified as valid, and we
503             # won't detect an uncoordinated write that has occurred
504             # since the last servermap update.
505             d = reader.get_prefix(force_remote=True)
506             d.addCallback(self._try_to_validate_prefix, reader)
507             ds.append(d)
508         dl = defer.DeferredList(ds, consumeErrors=True)
509         def _check_results(results):
510             # Each result in results will be of the form (success, msg).
511             # We don't care about msg, but success will tell us whether
512             # or not the checkstring validated. If it didn't, we need to
513             # remove the offending (peer,share) from our active readers,
514             # and ensure that active readers is again populated.
515             bad_readers = []
516             for i, result in enumerate(results):
517                 if not result[0]:
518                     reader = self._active_readers[i]
519                     f = result[1]
520                     assert isinstance(f, failure.Failure)
521
522                     self.log("The reader %s failed to "
523                              "properly validate: %s" % \
524                              (reader, str(f.value)))
525                     bad_readers.append((reader, f))
526                 else:
527                     reader = self._active_readers[i]
528                     self.log("the reader %s checks out, so we'll use it" % \
529                              reader)
530                     self._validated_readers.add(reader)
531                     # Each time we validate a reader, we check to see if
532                     # we need the private key. If we do, we politely ask
533                     # for it and then continue computing. If we find
534                     # that we haven't gotten it at the end of
535                     # segment decoding, then we'll take more drastic
536                     # measures.
537                     if self._need_privkey and not self._node.is_readonly():
538                         d = reader.get_encprivkey()
539                         d.addCallback(self._try_to_validate_privkey, reader)
540             if bad_readers:
541                 # We do them all at once, or else we screw up list indexing.
542                 for (reader, f) in bad_readers:
543                     self._mark_bad_share(reader, f)
544                 if self._verify:
545                     if len(self._active_readers) >= self._required_shares:
546                         return self._download_current_segment()
547                     else:
548                         return self._failed()
549                 else:
550                     return self._add_active_peers()
551             else:
552                 return self._download_current_segment()
553             # The next step will assert that it has enough active
554             # readers to fetch shares; we just need to remove it.
555         dl.addCallback(_check_results)
556         return dl
557
558
559     def _try_to_validate_prefix(self, prefix, reader):
560         """
561         I check that the prefix returned by a candidate server for
562         retrieval matches the prefix that the servermap knows about
563         (and, hence, the prefix that was validated earlier). If it does,
564         I return True, which means that I approve of the use of the
565         candidate server for segment retrieval. If it doesn't, I return
566         False, which means that another server must be chosen.
567         """
568         (seqnum,
569          root_hash,
570          IV,
571          segsize,
572          datalength,
573          k,
574          N,
575          known_prefix,
576          offsets_tuple) = self.verinfo
577         if known_prefix != prefix:
578             self.log("prefix from share %d doesn't match" % reader.shnum)
579             raise UncoordinatedWriteError("Mismatched prefix -- this could "
580                                           "indicate an uncoordinated write")
581         # Otherwise, we're okay -- no issues.
582
583
584     def _remove_reader(self, reader):
585         """
586         At various points, we will wish to remove a peer from
587         consideration and/or use. These include, but are not necessarily
588         limited to:
589
590             - A connection error.
591             - A mismatched prefix (that is, a prefix that does not match
592               our conception of the version information string).
593             - A failing block hash, salt hash, or share hash, which can
594               indicate disk failure/bit flips, or network trouble.
595
596         This method will do that. I will make sure that the
597         (shnum,reader) combination represented by my reader argument is
598         not used for anything else during this download. I will not
599         advise the reader of any corruption, something that my callers
600         may wish to do on their own.
601         """
602         # TODO: When you're done writing this, see if this is ever
603         # actually used for something that _mark_bad_share isn't. I have
604         # a feeling that they will be used for very similar things, and
605         # that having them both here is just going to be an epic amount
606         # of code duplication.
607         #
608         # (well, okay, not epic, but meaningful)
609         self.log("removing reader %s" % reader)
610         # Remove the reader from _active_readers
611         self._active_readers.remove(reader)
612         # TODO: self.readers.remove(reader)?
613         for shnum in list(self.remaining_sharemap.keys()):
614             self.remaining_sharemap.discard(shnum, reader.peerid)
615
616
617     def _mark_bad_share(self, reader, f):
618         """
619         I mark the (peerid, shnum) encapsulated by my reader argument as
620         a bad share, which means that it will not be used anywhere else.
621
622         There are several reasons to want to mark something as a bad
623         share. These include:
624
625             - A connection error to the peer.
626             - A mismatched prefix (that is, a prefix that does not match
627               our local conception of the version information string).
628             - A failing block hash, salt hash, share hash, or other
629               integrity check.
630
631         This method will ensure that readers that we wish to mark bad
632         (for these reasons or other reasons) are not used for the rest
633         of the download. Additionally, it will attempt to tell the
634         remote peer (with no guarantee of success) that its share is
635         corrupt.
636         """
637         self.log("marking share %d on server %s as bad" % \
638                  (reader.shnum, reader))
639         prefix = self.verinfo[-2]
640         self.servermap.mark_bad_share(reader.peerid,
641                                       reader.shnum,
642                                       prefix)
643         self._remove_reader(reader)
644         self._bad_shares.add((reader.peerid, reader.shnum, f))
645         self._status.problems[reader.peerid] = f
646         self._last_failure = f
647         self.notify_server_corruption(reader.peerid, reader.shnum,
648                                       str(f.value))
649
650
651     def _download_current_segment(self):
652         """
653         I download, validate, decode, decrypt, and assemble the segment
654         that this Retrieve is currently responsible for downloading.
655         """
656         assert len(self._active_readers) >= self._required_shares
657         if self._current_segment <= self._last_segment:
658             d = self._process_segment(self._current_segment)
659         else:
660             d = defer.succeed(None)
661         d.addBoth(self._turn_barrier)
662         d.addCallback(self._check_for_done)
663         return d
664
665
666     def _turn_barrier(self, result):
667         """
668         I help the download process avoid the recursion limit issues
669         discussed in #237.
670         """
671         return fireEventually(result)
672
673
674     def _process_segment(self, segnum):
675         """
676         I download, validate, decode, and decrypt one segment of the
677         file that this Retrieve is retrieving. This means coordinating
678         the process of getting k blocks of that file, validating them,
679         assembling them into one segment with the decoder, and then
680         decrypting them.
681         """
682         self.log("processing segment %d" % segnum)
683
684         # TODO: The old code uses a marker. Should this code do that
685         # too? What did the Marker do?
686         assert len(self._active_readers) >= self._required_shares
687
688         # We need to ask each of our active readers for its block and
689         # salt. We will then validate those. If validation is
690         # successful, we will assemble the results into plaintext.
691         ds = []
692         for reader in self._active_readers:
693             started = time.time()
694             d = reader.get_block_and_salt(segnum, queue=True)
695             d2 = self._get_needed_hashes(reader, segnum)
696             dl = defer.DeferredList([d, d2], consumeErrors=True)
697             dl.addCallback(self._validate_block, segnum, reader, started)
698             dl.addErrback(self._validation_or_decoding_failed, [reader])
699             ds.append(dl)
700             reader.flush()
701         dl = defer.DeferredList(ds)
702         if self._verify:
703             dl.addCallback(lambda ignored: "")
704             dl.addCallback(self._set_segment)
705         else:
706             dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
707         return dl
708
709
710     def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum):
711         """
712         I take the results of fetching and validating the blocks from a
713         callback chain in another method. If the results are such that
714         they tell me that validation and fetching succeeded without
715         incident, I will proceed with decoding and decryption.
716         Otherwise, I will do nothing.
717         """
718         self.log("trying to decode and decrypt segment %d" % segnum)
719         failures = False
720         for block_and_salt in blocks_and_salts:
721             if not block_and_salt[0] or block_and_salt[1] == None:
722                 self.log("some validation operations failed; not proceeding")
723                 failures = True
724                 break
725         if not failures:
726             self.log("everything looks ok, building segment %d" % segnum)
727             d = self._decode_blocks(blocks_and_salts, segnum)
728             d.addCallback(self._decrypt_segment)
729             d.addErrback(self._validation_or_decoding_failed,
730                          self._active_readers)
731             # check to see whether we've been paused before writing
732             # anything.
733             d.addCallback(self._check_for_paused)
734             d.addCallback(self._set_segment)
735             return d
736         else:
737             return defer.succeed(None)
738
739
740     def _set_segment(self, segment):
741         """
742         Given a plaintext segment, I register that segment with the
743         target that is handling the file download.
744         """
745         self.log("got plaintext for segment %d" % self._current_segment)
746         if self._current_segment == self._start_segment:
747             # We're on the first segment. It's possible that we want
748             # only some part of the end of this segment, and that we
749             # just downloaded the whole thing to get that part. If so,
750             # we need to account for that and give the reader just the
751             # data that they want.
752             n = self._offset % self._segment_size
753             self.log("stripping %d bytes off of the first segment" % n)
754             self.log("original segment length: %d" % len(segment))
755             segment = segment[n:]
756             self.log("new segment length: %d" % len(segment))
757
758         if self._current_segment == self._last_segment and self._read_length is not None:
759             # We're on the last segment. It's possible that we only want
760             # part of the beginning of this segment, and that we
761             # downloaded the whole thing anyway. Make sure to give the
762             # caller only the portion of the segment that they want to
763             # receive.
764             extra = self._read_length
765             if self._start_segment != self._last_segment:
766                 extra -= self._segment_size - \
767                             (self._offset % self._segment_size)
768             extra %= self._segment_size
769             self.log("original segment length: %d" % len(segment))
770             segment = segment[:extra]
771             self.log("new segment length: %d" % len(segment))
772             self.log("only taking %d bytes of the last segment" % extra)
773
774         if not self._verify:
775             self._consumer.write(segment)
776         else:
777             # we don't care about the plaintext if we are doing a verify.
778             segment = None
779         self._current_segment += 1
780
781
782     def _validation_or_decoding_failed(self, f, readers):
783         """
784         I am called when a block or a salt fails to correctly validate, or when
785         the decryption or decoding operation fails for some reason.  I react to
786         this failure by notifying the remote server of corruption, and then
787         removing the remote peer from further activity.
788         """
789         assert isinstance(readers, list)
790         bad_shnums = [reader.shnum for reader in readers]
791
792         self.log("validation or decoding failed on share(s) %s, peer(s) %s "
793                  ", segment %d: %s" % \
794                  (bad_shnums, readers, self._current_segment, str(f)))
795         for reader in readers:
796             self._mark_bad_share(reader, f)
797         return
798
799
800     def _validate_block(self, results, segnum, reader, started):
801         """
802         I validate a block from one share on a remote server.
803         """
804         # Grab the part of the block hash tree that is necessary to
805         # validate this block, then generate the block hash root.
806         self.log("validating share %d for segment %d" % (reader.shnum,
807                                                              segnum))
808         self._status.add_fetch_timing(reader.peerid, started)
809         self._status.set_status("Valdiating blocks for segment %d" % segnum)
810         # Did we fail to fetch either of the things that we were
811         # supposed to? Fail if so.
812         if not results[0][0] and results[1][0]:
813             # handled by the errback handler.
814
815             # These all get batched into one query, so the resulting
816             # failure should be the same for all of them, so we can just
817             # use the first one.
818             assert isinstance(results[0][1], failure.Failure)
819
820             f = results[0][1]
821             raise CorruptShareError(reader.peerid,
822                                     reader.shnum,
823                                     "Connection error: %s" % str(f))
824
825         block_and_salt, block_and_sharehashes = results
826         block, salt = block_and_salt[1]
827         blockhashes, sharehashes = block_and_sharehashes[1]
828
829         blockhashes = dict(enumerate(blockhashes[1]))
830         self.log("the reader gave me the following blockhashes: %s" % \
831                  blockhashes.keys())
832         self.log("the reader gave me the following sharehashes: %s" % \
833                  sharehashes[1].keys())
834         bht = self._block_hash_trees[reader.shnum]
835
836         if bht.needed_hashes(segnum, include_leaf=True):
837             try:
838                 bht.set_hashes(blockhashes)
839             except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
840                     IndexError), e:
841                 raise CorruptShareError(reader.peerid,
842                                         reader.shnum,
843                                         "block hash tree failure: %s" % e)
844
845         if self._version == MDMF_VERSION:
846             blockhash = hashutil.block_hash(salt + block)
847         else:
848             blockhash = hashutil.block_hash(block)
849         # If this works without an error, then validation is
850         # successful.
851         try:
852            bht.set_hashes(leaves={segnum: blockhash})
853         except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
854                 IndexError), e:
855             raise CorruptShareError(reader.peerid,
856                                     reader.shnum,
857                                     "block hash tree failure: %s" % e)
858
859         # Reaching this point means that we know that this segment
860         # is correct. Now we need to check to see whether the share
861         # hash chain is also correct. 
862         # SDMF wrote share hash chains that didn't contain the
863         # leaves, which would be produced from the block hash tree.
864         # So we need to validate the block hash tree first. If
865         # successful, then bht[0] will contain the root for the
866         # shnum, which will be a leaf in the share hash tree, which
867         # will allow us to validate the rest of the tree.
868         if self.share_hash_tree.needed_hashes(reader.shnum,
869                                               include_leaf=True) or \
870                                               self._verify:
871             try:
872                 self.share_hash_tree.set_hashes(hashes=sharehashes[1],
873                                             leaves={reader.shnum: bht[0]})
874             except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
875                     IndexError), e:
876                 raise CorruptShareError(reader.peerid,
877                                         reader.shnum,
878                                         "corrupt hashes: %s" % e)
879
880         self.log('share %d is valid for segment %d' % (reader.shnum,
881                                                        segnum))
882         return {reader.shnum: (block, salt)}
883
884
885     def _get_needed_hashes(self, reader, segnum):
886         """
887         I get the hashes needed to validate segnum from the reader, then return
888         to my caller when this is done.
889         """
890         bht = self._block_hash_trees[reader.shnum]
891         needed = bht.needed_hashes(segnum, include_leaf=True)
892         # The root of the block hash tree is also a leaf in the share
893         # hash tree. So we don't need to fetch it from the remote
894         # server. In the case of files with one segment, this means that
895         # we won't fetch any block hash tree from the remote server,
896         # since the hash of each share of the file is the entire block
897         # hash tree, and is a leaf in the share hash tree. This is fine,
898         # since any share corruption will be detected in the share hash
899         # tree.
900         #needed.discard(0)
901         self.log("getting blockhashes for segment %d, share %d: %s" % \
902                  (segnum, reader.shnum, str(needed)))
903         d1 = reader.get_blockhashes(needed, queue=True, force_remote=True)
904         if self.share_hash_tree.needed_hashes(reader.shnum):
905             need = self.share_hash_tree.needed_hashes(reader.shnum)
906             self.log("also need sharehashes for share %d: %s" % (reader.shnum,
907                                                                  str(need)))
908             d2 = reader.get_sharehashes(need, queue=True, force_remote=True)
909         else:
910             d2 = defer.succeed({}) # the logic in the next method
911                                    # expects a dict
912         dl = defer.DeferredList([d1, d2], consumeErrors=True)
913         return dl
914
915
916     def _decode_blocks(self, blocks_and_salts, segnum):
917         """
918         I take a list of k blocks and salts, and decode that into a
919         single encrypted segment.
920         """
921         d = {}
922         # We want to merge our dictionaries to the form 
923         # {shnum: blocks_and_salts}
924         #
925         # The dictionaries come from validate block that way, so we just
926         # need to merge them.
927         for block_and_salt in blocks_and_salts:
928             d.update(block_and_salt[1])
929
930         # All of these blocks should have the same salt; in SDMF, it is
931         # the file-wide IV, while in MDMF it is the per-segment salt. In
932         # either case, we just need to get one of them and use it.
933         #
934         # d.items()[0] is like (shnum, (block, salt))
935         # d.items()[0][1] is like (block, salt)
936         # d.items()[0][1][1] is the salt.
937         salt = d.items()[0][1][1]
938         # Next, extract just the blocks from the dict. We'll use the
939         # salt in the next step.
940         share_and_shareids = [(k, v[0]) for k, v in d.items()]
941         d2 = dict(share_and_shareids)
942         shareids = []
943         shares = []
944         for shareid, share in d2.items():
945             shareids.append(shareid)
946             shares.append(share)
947
948         self._status.set_status("Decoding")
949         started = time.time()
950         assert len(shareids) >= self._required_shares, len(shareids)
951         # zfec really doesn't want extra shares
952         shareids = shareids[:self._required_shares]
953         shares = shares[:self._required_shares]
954         self.log("decoding segment %d" % segnum)
955         if segnum == self._num_segments - 1:
956             d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
957         else:
958             d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
959         def _process(buffers):
960             segment = "".join(buffers)
961             self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
962                      segnum=segnum,
963                      numsegs=self._num_segments,
964                      level=log.NOISY)
965             self.log(" joined length %d, datalength %d" %
966                      (len(segment), self._data_length))
967             if segnum == self._num_segments - 1:
968                 size_to_use = self._tail_data_size
969             else:
970                 size_to_use = self._segment_size
971             segment = segment[:size_to_use]
972             self.log(" segment len=%d" % len(segment))
973             self._status.timings.setdefault("decode", 0)
974             self._status.timings['decode'] = time.time() - started
975             return segment, salt
976         d.addCallback(_process)
977         return d
978
979
980     def _decrypt_segment(self, segment_and_salt):
981         """
982         I take a single segment and its salt, and decrypt it. I return
983         the plaintext of the segment that is in my argument.
984         """
985         segment, salt = segment_and_salt
986         self._status.set_status("decrypting")
987         self.log("decrypting segment %d" % self._current_segment)
988         started = time.time()
989         key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
990         decryptor = AES(key)
991         plaintext = decryptor.process(segment)
992         self._status.timings.setdefault("decrypt", 0)
993         self._status.timings['decrypt'] = time.time() - started
994         return plaintext
995
996
997     def notify_server_corruption(self, peerid, shnum, reason):
998         ss = self.servermap.connections[peerid]
999         ss.callRemoteOnly("advise_corrupt_share",
1000                           "mutable", self._storage_index, shnum, reason)
1001
1002
1003     def _try_to_validate_privkey(self, enc_privkey, reader):
1004         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
1005         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
1006         if alleged_writekey != self._node.get_writekey():
1007             self.log("invalid privkey from %s shnum %d" %
1008                      (reader, reader.shnum),
1009                      level=log.WEIRD, umid="YIw4tA")
1010             if self._verify:
1011                 self.servermap.mark_bad_share(reader.peerid, reader.shnum,
1012                                               self.verinfo[-2])
1013                 e = CorruptShareError(reader.peerid,
1014                                       reader.shnum,
1015                                       "invalid privkey")
1016                 f = failure.Failure(e)
1017                 self._bad_shares.add((reader.peerid, reader.shnum, f))
1018             return
1019
1020         # it's good
1021         self.log("got valid privkey from shnum %d on reader %s" %
1022                  (reader.shnum, reader))
1023         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
1024         self._node._populate_encprivkey(enc_privkey)
1025         self._node._populate_privkey(privkey)
1026         self._need_privkey = False
1027
1028
1029     def _check_for_done(self, res):
1030         """
1031         I check to see if this Retrieve object has successfully finished
1032         its work.
1033
1034         I can exit in the following ways:
1035             - If there are no more segments to download, then I exit by
1036               causing self._done_deferred to fire with the plaintext
1037               content requested by the caller.
1038             - If there are still segments to be downloaded, and there
1039               are enough active readers (readers which have not broken
1040               and have not given us corrupt data) to continue
1041               downloading, I send control back to
1042               _download_current_segment.
1043             - If there are still segments to be downloaded but there are
1044               not enough active peers to download them, I ask
1045               _add_active_peers to add more peers. If it is successful,
1046               it will call _download_current_segment. If there are not
1047               enough peers to retrieve the file, then that will cause
1048               _done_deferred to errback.
1049         """
1050         self.log("checking for doneness")
1051         if self._current_segment > self._last_segment:
1052             # No more segments to download, we're done.
1053             self.log("got plaintext, done")
1054             return self._done()
1055
1056         if len(self._active_readers) >= self._required_shares:
1057             # More segments to download, but we have enough good peers
1058             # in self._active_readers that we can do that without issue,
1059             # so go nab the next segment.
1060             self.log("not done yet: on segment %d of %d" % \
1061                      (self._current_segment + 1, self._num_segments))
1062             return self._download_current_segment()
1063
1064         self.log("not done yet: on segment %d of %d, need to add peers" % \
1065                  (self._current_segment + 1, self._num_segments))
1066         return self._add_active_peers()
1067
1068
1069     def _done(self):
1070         """
1071         I am called by _check_for_done when the download process has
1072         finished successfully. After making some useful logging
1073         statements, I return the decrypted contents to the owner of this
1074         Retrieve object through self._done_deferred.
1075         """
1076         self._running = False
1077         self._status.set_active(False)
1078         now = time.time()
1079         self._status.timings['total'] = now - self._started
1080         self._status.timings['fetch'] = now - self._started_fetching
1081
1082         if self._verify:
1083             ret = list(self._bad_shares)
1084             self.log("done verifying, found %d bad shares" % len(ret))
1085         else:
1086             # TODO: upload status here?
1087             ret = self._consumer
1088             self._consumer.unregisterProducer()
1089         eventually(self._done_deferred.callback, ret)
1090
1091
1092     def _failed(self):
1093         """
1094         I am called by _add_active_peers when there are not enough
1095         active peers left to complete the download. After making some
1096         useful logging statements, I return an exception to that effect
1097         to the caller of this Retrieve object through
1098         self._done_deferred.
1099         """
1100         self._running = False
1101         self._status.set_active(False)
1102         now = time.time()
1103         self._status.timings['total'] = now - self._started
1104         self._status.timings['fetch'] = now - self._started_fetching
1105
1106         if self._verify:
1107             ret = list(self._bad_shares)
1108         else:
1109             format = ("ran out of peers: "
1110                       "have %(have)d of %(total)d segments "
1111                       "found %(bad)d bad shares "
1112                       "encoding %(k)d-of-%(n)d")
1113             args = {"have": self._current_segment,
1114                     "total": self._num_segments,
1115                     "need": self._last_segment,
1116                     "k": self._required_shares,
1117                     "n": self._total_shares,
1118                     "bad": len(self._bad_shares)}
1119             e = NotEnoughSharesError("%s, last failure: %s" % \
1120                                      (format % args, str(self._last_failure)))
1121             f = failure.Failure(e)
1122             ret = f
1123         eventually(self._done_deferred.callback, ret)