]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/retrieve.py
rearrange Retrieve: first step, shouldn't change order of execution
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / retrieve.py
1
2 import time
3 from itertools import count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from twisted.internet.interfaces import IPushProducer, IConsumer
8 from foolscap.api import eventually, fireEventually
9 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
10                                  MDMF_VERSION, SDMF_VERSION
11 from allmydata.util import hashutil, log, mathutil
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from pycryptopp.publickey import rsa
17
18 from allmydata.mutable.common import CorruptShareError, UncoordinatedWriteError
19 from allmydata.mutable.layout import MDMFSlotReadProxy
20
21 class RetrieveStatus:
22     implements(IRetrieveStatus)
23     statusid_counter = count(0)
24     def __init__(self):
25         self.timings = {}
26         self.timings["fetch_per_server"] = {}
27         self.timings["decode"] = 0.0
28         self.timings["decrypt"] = 0.0
29         self.timings["cumulative_verify"] = 0.0
30         self.problems = {}
31         self.active = True
32         self.storage_index = None
33         self.helper = False
34         self.encoding = ("?","?")
35         self.size = None
36         self.status = "Not started"
37         self.progress = 0.0
38         self.counter = self.statusid_counter.next()
39         self.started = time.time()
40
41     def get_started(self):
42         return self.started
43     def get_storage_index(self):
44         return self.storage_index
45     def get_encoding(self):
46         return self.encoding
47     def using_helper(self):
48         return self.helper
49     def get_size(self):
50         return self.size
51     def get_status(self):
52         return self.status
53     def get_progress(self):
54         return self.progress
55     def get_active(self):
56         return self.active
57     def get_counter(self):
58         return self.counter
59
60     def add_fetch_timing(self, peerid, elapsed):
61         if peerid not in self.timings["fetch_per_server"]:
62             self.timings["fetch_per_server"][peerid] = []
63         self.timings["fetch_per_server"][peerid].append(elapsed)
64     def accumulate_decode_time(self, elapsed):
65         self.timings["decode"] += elapsed
66     def accumulate_decrypt_time(self, elapsed):
67         self.timings["decrypt"] += elapsed
68     def set_storage_index(self, si):
69         self.storage_index = si
70     def set_helper(self, helper):
71         self.helper = helper
72     def set_encoding(self, k, n):
73         self.encoding = (k, n)
74     def set_size(self, size):
75         self.size = size
76     def set_status(self, status):
77         self.status = status
78     def set_progress(self, value):
79         self.progress = value
80     def set_active(self, value):
81         self.active = value
82
83 class Marker:
84     pass
85
86 class Retrieve:
87     # this class is currently single-use. Eventually (in MDMF) we will make
88     # it multi-use, in which case you can call download(range) multiple
89     # times, and each will have a separate response chain. However the
90     # Retrieve object will remain tied to a specific version of the file, and
91     # will use a single ServerMap instance.
92     implements(IPushProducer)
93
94     def __init__(self, filenode, servermap, verinfo, fetch_privkey=False,
95                  verify=False):
96         self._node = filenode
97         assert self._node.get_pubkey()
98         self._storage_index = filenode.get_storage_index()
99         assert self._node.get_readkey()
100         self._last_failure = None
101         prefix = si_b2a(self._storage_index)[:5]
102         self._log_number = log.msg("Retrieve(%s): starting" % prefix)
103         self._outstanding_queries = {} # maps (peerid,shnum) to start_time
104         self._running = True
105         self._decoding = False
106         self._bad_shares = set()
107
108         self.servermap = servermap
109         assert self._node.get_pubkey()
110         self.verinfo = verinfo
111         # during repair, we may be called upon to grab the private key, since
112         # it wasn't picked up during a verify=False checker run, and we'll
113         # need it for repair to generate a new version.
114         self._need_privkey = verify or (fetch_privkey
115                                         and not self._node.get_privkey())
116
117         if self._need_privkey:
118             # TODO: Evaluate the need for this. We'll use it if we want
119             # to limit how many queries are on the wire for the privkey
120             # at once.
121             self._privkey_query_markers = [] # one Marker for each time we've
122                                              # tried to get the privkey.
123
124         # verify means that we are using the downloader logic to verify all
125         # of our shares. This tells the downloader a few things.
126         # 
127         # 1. We need to download all of the shares.
128         # 2. We don't need to decode or decrypt the shares, since our
129         #    caller doesn't care about the plaintext, only the
130         #    information about which shares are or are not valid.
131         # 3. When we are validating readers, we need to validate the
132         #    signature on the prefix. Do we? We already do this in the
133         #    servermap update?
134         self._verify = verify
135
136         self._status = RetrieveStatus()
137         self._status.set_storage_index(self._storage_index)
138         self._status.set_helper(False)
139         self._status.set_progress(0.0)
140         self._status.set_active(True)
141         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
142          offsets_tuple) = self.verinfo
143         self._status.set_size(datalength)
144         self._status.set_encoding(k, N)
145         self.readers = {}
146         self._pause_deferred = None
147         self._offset = None
148         self._read_length = None
149         self.log("got seqnum %d" % self.verinfo[0])
150
151
152     def get_status(self):
153         return self._status
154
155     def log(self, *args, **kwargs):
156         if "parent" not in kwargs:
157             kwargs["parent"] = self._log_number
158         if "facility" not in kwargs:
159             kwargs["facility"] = "tahoe.mutable.retrieve"
160         return log.msg(*args, **kwargs)
161
162     def _set_current_status(self, state):
163         seg = "%d/%d" % (self._current_segment, self._last_segment)
164         self._status.set_status("segment %s (%s)" % (seg, state))
165
166     ###################
167     # IPushProducer
168
169     def pauseProducing(self):
170         """
171         I am called by my download target if we have produced too much
172         data for it to handle. I make the downloader stop producing new
173         data until my resumeProducing method is called.
174         """
175         if self._pause_deferred is not None:
176             return
177
178         # fired when the download is unpaused. 
179         self._old_status = self._status.get_status()
180         self._set_current_status("paused")
181
182         self._pause_deferred = defer.Deferred()
183
184
185     def resumeProducing(self):
186         """
187         I am called by my download target once it is ready to begin
188         receiving data again.
189         """
190         if self._pause_deferred is None:
191             return
192
193         p = self._pause_deferred
194         self._pause_deferred = None
195         self._status.set_status(self._old_status)
196
197         eventually(p.callback, None)
198
199
200     def _check_for_paused(self, res):
201         """
202         I am called just before a write to the consumer. I return a
203         Deferred that eventually fires with the data that is to be
204         written to the consumer. If the download has not been paused,
205         the Deferred fires immediately. Otherwise, the Deferred fires
206         when the downloader is unpaused.
207         """
208         if self._pause_deferred is not None:
209             d = defer.Deferred()
210             self._pause_deferred.addCallback(lambda ignored: d.callback(res))
211             return d
212         return defer.succeed(res)
213
214
215     def download(self, consumer=None, offset=0, size=None):
216         assert IConsumer.providedBy(consumer) or self._verify
217
218         if consumer:
219             self._consumer = consumer
220             # we provide IPushProducer, so streaming=True, per
221             # IConsumer.
222             self._consumer.registerProducer(self, streaming=True)
223
224         self._done_deferred = defer.Deferred()
225         self._offset = offset
226         self._read_length = size
227         self._setup_download()
228         self._setup_encoding_parameters()
229         self.log("starting download")
230         self._started_fetching = time.time()
231         d = self._add_active_peers()
232         # ...
233         # The download process beyond this is a state machine.
234         # _add_active_peers will select the peers that we want to use
235         # for the download, and then attempt to start downloading. After
236         # each segment, it will check for doneness, reacting to broken
237         # peers and corrupt shares as necessary. If it runs out of good
238         # peers before downloading all of the segments, _done_deferred
239         # will errback.  Otherwise, it will eventually callback with the
240         # contents of the mutable file.
241         return self._done_deferred
242
243     def _setup_download(self):
244         self._started = time.time()
245         self._status.set_status("Retrieving Shares")
246
247         # how many shares do we need?
248         (seqnum,
249          root_hash,
250          IV,
251          segsize,
252          datalength,
253          k,
254          N,
255          prefix,
256          offsets_tuple) = self.verinfo
257
258         # first, which servers can we use?
259         versionmap = self.servermap.make_versionmap()
260         shares = versionmap[self.verinfo]
261         # this sharemap is consumed as we decide to send requests
262         self.remaining_sharemap = DictOfSets()
263         for (shnum, peerid, timestamp) in shares:
264             self.remaining_sharemap.add(shnum, peerid)
265             # If the servermap update fetched anything, it fetched at least 1
266             # KiB, so we ask for that much.
267             # TODO: Change the cache methods to allow us to fetch all of the
268             # data that they have, then change this method to do that.
269             any_cache = self._node._read_from_cache(self.verinfo, shnum,
270                                                     0, 1000)
271             ss = self.servermap.connections[peerid]
272             reader = MDMFSlotReadProxy(ss,
273                                        self._storage_index,
274                                        shnum,
275                                        any_cache)
276             reader.peerid = peerid
277             self.readers[shnum] = reader
278         assert len(self.remaining_sharemap) >= k
279
280         self.shares = {} # maps shnum to validated blocks
281         self._active_readers = [] # list of active readers for this dl.
282         self._validated_readers = set() # set of readers that we have
283                                         # validated the prefix of
284         self._block_hash_trees = {} # shnum => hashtree
285
286         # We need one share hash tree for the entire file; its leaves
287         # are the roots of the block hash trees for the shares that
288         # comprise it, and its root is in the verinfo.
289         self.share_hash_tree = hashtree.IncompleteHashTree(N)
290         self.share_hash_tree.set_hashes({0: root_hash})
291
292     def decode(self, blocks_and_salts, segnum):
293         """
294         I am a helper method that the mutable file update process uses
295         as a shortcut to decode and decrypt the segments that it needs
296         to fetch in order to perform a file update. I take in a
297         collection of blocks and salts, and pick some of those to make a
298         segment with. I return the plaintext associated with that
299         segment.
300         """
301         # shnum => block hash tree. Unused, but setup_encoding_parameters will
302         # want to set this.
303         self._block_hash_trees = None
304         self._setup_encoding_parameters()
305
306         # This is the form expected by decode.
307         blocks_and_salts = blocks_and_salts.items()
308         blocks_and_salts = [(True, [d]) for d in blocks_and_salts]
309
310         d = self._decode_blocks(blocks_and_salts, segnum)
311         d.addCallback(self._decrypt_segment)
312         return d
313
314
315     def _setup_encoding_parameters(self):
316         """
317         I set up the encoding parameters, including k, n, the number
318         of segments associated with this file, and the segment decoders.
319         """
320         (seqnum,
321          root_hash,
322          IV,
323          segsize,
324          datalength,
325          k,
326          n,
327          known_prefix,
328          offsets_tuple) = self.verinfo
329         self._required_shares = k
330         self._total_shares = n
331         self._segment_size = segsize
332         self._data_length = datalength
333
334         if not IV:
335             self._version = MDMF_VERSION
336         else:
337             self._version = SDMF_VERSION
338
339         if datalength and segsize:
340             self._num_segments = mathutil.div_ceil(datalength, segsize)
341             self._tail_data_size = datalength % segsize
342         else:
343             self._num_segments = 0
344             self._tail_data_size = 0
345
346         self._segment_decoder = codec.CRSDecoder()
347         self._segment_decoder.set_params(segsize, k, n)
348
349         if  not self._tail_data_size:
350             self._tail_data_size = segsize
351
352         self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
353                                                          self._required_shares)
354         if self._tail_segment_size == self._segment_size:
355             self._tail_decoder = self._segment_decoder
356         else:
357             self._tail_decoder = codec.CRSDecoder()
358             self._tail_decoder.set_params(self._tail_segment_size,
359                                           self._required_shares,
360                                           self._total_shares)
361
362         self.log("got encoding parameters: "
363                  "k: %d "
364                  "n: %d "
365                  "%d segments of %d bytes each (%d byte tail segment)" % \
366                  (k, n, self._num_segments, self._segment_size,
367                   self._tail_segment_size))
368
369         if self._block_hash_trees is not None:
370             for i in xrange(self._total_shares):
371                 # So we don't have to do this later.
372                 self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
373
374         # Our last task is to tell the downloader where to start and
375         # where to stop. We use three parameters for that:
376         #   - self._start_segment: the segment that we need to start
377         #     downloading from. 
378         #   - self._current_segment: the next segment that we need to
379         #     download.
380         #   - self._last_segment: The last segment that we were asked to
381         #     download.
382         #
383         #  We say that the download is complete when
384         #  self._current_segment > self._last_segment. We use
385         #  self._start_segment and self._last_segment to know when to
386         #  strip things off of segments, and how much to strip.
387         if self._offset:
388             self.log("got offset: %d" % self._offset)
389             # our start segment is the first segment containing the
390             # offset we were given. 
391             start = self._offset // self._segment_size
392
393             assert start < self._num_segments
394             self._start_segment = start
395             self.log("got start segment: %d" % self._start_segment)
396         else:
397             self._start_segment = 0
398
399
400         # If self._read_length is None, then we want to read the whole
401         # file. Otherwise, we want to read only part of the file, and
402         # need to figure out where to stop reading.
403         if self._read_length is not None:
404             # our end segment is the last segment containing part of the
405             # segment that we were asked to read.
406             self.log("got read length %d" % self._read_length)
407             if self._read_length != 0:
408                 end_data = self._offset + self._read_length
409
410                 # We don't actually need to read the byte at end_data,
411                 # but the one before it.
412                 end = (end_data - 1) // self._segment_size
413
414                 assert end < self._num_segments
415                 self._last_segment = end
416             else:
417                 self._last_segment = self._start_segment
418             self.log("got end segment: %d" % self._last_segment)
419         else:
420             self._last_segment = self._num_segments - 1
421
422         self._current_segment = self._start_segment
423
424     def _add_active_peers(self):
425         """
426         I populate self._active_readers with enough active readers to
427         retrieve the contents of this mutable file. I am called before
428         downloading starts, and (eventually) after each validation
429         error, connection error, or other problem in the download.
430         """
431         # TODO: It would be cool to investigate other heuristics for
432         # reader selection. For instance, the cost (in time the user
433         # spends waiting for their file) of selecting a really slow peer
434         # that happens to have a primary share is probably more than
435         # selecting a really fast peer that doesn't have a primary
436         # share. Maybe the servermap could be extended to provide this
437         # information; it could keep track of latency information while
438         # it gathers more important data, and then this routine could
439         # use that to select active readers.
440         #
441         # (these and other questions would be easier to answer with a
442         #  robust, configurable tahoe-lafs simulator, which modeled node
443         #  failures, differences in node speed, and other characteristics
444         #  that we expect storage servers to have.  You could have
445         #  presets for really stable grids (like allmydata.com),
446         #  friendnets, make it easy to configure your own settings, and
447         #  then simulate the effect of big changes on these use cases
448         #  instead of just reasoning about what the effect might be. Out
449         #  of scope for MDMF, though.)
450
451         # We need at least self._required_shares readers to download a
452         # segment.
453         if self._verify:
454             needed = self._total_shares
455         else:
456             needed = self._required_shares - len(self._active_readers)
457         # XXX: Why don't format= log messages work here?
458         self.log("adding %d peers to the active peers list" % needed)
459
460         # We favor lower numbered shares, since FEC is faster with
461         # primary shares than with other shares, and lower-numbered
462         # shares are more likely to be primary than higher numbered
463         # shares.
464         active_shnums = set(sorted(self.remaining_sharemap.keys()))
465         # We shouldn't consider adding shares that we already have; this
466         # will cause problems later.
467         active_shnums -= set([reader.shnum for reader in self._active_readers])
468         active_shnums = list(active_shnums)[:needed]
469         if len(active_shnums) < needed and not self._verify:
470             # We don't have enough readers to retrieve the file; fail.
471             return self._failed()
472
473         for shnum in active_shnums:
474             self._active_readers.append(self.readers[shnum])
475             self.log("added reader for share %d" % shnum)
476         assert len(self._active_readers) >= self._required_shares
477         # Conceptually, this is part of the _add_active_peers step. It
478         # validates the prefixes of newly added readers to make sure
479         # that they match what we are expecting for self.verinfo. If
480         # validation is successful, _validate_active_prefixes will call
481         # _download_current_segment for us. If validation is
482         # unsuccessful, then _validate_prefixes will remove the peer and
483         # call _add_active_peers again, where we will attempt to rectify
484         # the problem by choosing another peer.
485         return self._validate_active_prefixes()
486
487
488     def _validate_active_prefixes(self):
489         """
490         I check to make sure that the prefixes on the peers that I am
491         currently reading from match the prefix that we want to see, as
492         said in self.verinfo.
493
494         If I find that all of the active peers have acceptable prefixes,
495         I pass control to _download_current_segment, which will use
496         those peers to do cool things. If I find that some of the active
497         peers have unacceptable prefixes, I will remove them from active
498         peers (and from further consideration) and call
499         _add_active_peers to attempt to rectify the situation. I keep
500         track of which peers I have already validated so that I don't
501         need to do so again.
502         """
503         assert self._active_readers, "No more active readers"
504
505         ds = []
506         new_readers = set(self._active_readers) - self._validated_readers
507         self.log('validating %d newly-added active readers' % len(new_readers))
508
509         for reader in new_readers:
510             # We force a remote read here -- otherwise, we are relying
511             # on cached data that we already verified as valid, and we
512             # won't detect an uncoordinated write that has occurred
513             # since the last servermap update.
514             d = reader.get_prefix(force_remote=True)
515             d.addCallback(self._try_to_validate_prefix, reader)
516             ds.append(d)
517         dl = defer.DeferredList(ds, consumeErrors=True)
518         def _check_results(results):
519             # Each result in results will be of the form (success, msg).
520             # We don't care about msg, but success will tell us whether
521             # or not the checkstring validated. If it didn't, we need to
522             # remove the offending (peer,share) from our active readers,
523             # and ensure that active readers is again populated.
524             bad_readers = []
525             for i, result in enumerate(results):
526                 if not result[0]:
527                     reader = self._active_readers[i]
528                     f = result[1]
529                     assert isinstance(f, failure.Failure)
530
531                     self.log("The reader %s failed to "
532                              "properly validate: %s" % \
533                              (reader, str(f.value)))
534                     bad_readers.append((reader, f))
535                 else:
536                     reader = self._active_readers[i]
537                     self.log("the reader %s checks out, so we'll use it" % \
538                              reader)
539                     self._validated_readers.add(reader)
540                     # Each time we validate a reader, we check to see if
541                     # we need the private key. If we do, we politely ask
542                     # for it and then continue computing. If we find
543                     # that we haven't gotten it at the end of
544                     # segment decoding, then we'll take more drastic
545                     # measures.
546                     if self._need_privkey and not self._node.is_readonly():
547                         d = reader.get_encprivkey()
548                         d.addCallback(self._try_to_validate_privkey, reader)
549             if bad_readers:
550                 # We do them all at once, or else we screw up list indexing.
551                 for (reader, f) in bad_readers:
552                     self._mark_bad_share(reader, f)
553                 if self._verify:
554                     if len(self._active_readers) >= self._required_shares:
555                         return self._download_current_segment()
556                     else:
557                         return self._failed()
558                 else:
559                     return self._add_active_peers()
560             else:
561                 return self._download_current_segment()
562             # The next step will assert that it has enough active
563             # readers to fetch shares; we just need to remove it.
564         dl.addCallback(_check_results)
565         return dl
566
567
568     def _try_to_validate_prefix(self, prefix, reader):
569         """
570         I check that the prefix returned by a candidate server for
571         retrieval matches the prefix that the servermap knows about
572         (and, hence, the prefix that was validated earlier). If it does,
573         I return True, which means that I approve of the use of the
574         candidate server for segment retrieval. If it doesn't, I return
575         False, which means that another server must be chosen.
576         """
577         (seqnum,
578          root_hash,
579          IV,
580          segsize,
581          datalength,
582          k,
583          N,
584          known_prefix,
585          offsets_tuple) = self.verinfo
586         if known_prefix != prefix:
587             self.log("prefix from share %d doesn't match" % reader.shnum)
588             raise UncoordinatedWriteError("Mismatched prefix -- this could "
589                                           "indicate an uncoordinated write")
590         # Otherwise, we're okay -- no issues.
591
592
593     def _remove_reader(self, reader):
594         """
595         At various points, we will wish to remove a peer from
596         consideration and/or use. These include, but are not necessarily
597         limited to:
598
599             - A connection error.
600             - A mismatched prefix (that is, a prefix that does not match
601               our conception of the version information string).
602             - A failing block hash, salt hash, or share hash, which can
603               indicate disk failure/bit flips, or network trouble.
604
605         This method will do that. I will make sure that the
606         (shnum,reader) combination represented by my reader argument is
607         not used for anything else during this download. I will not
608         advise the reader of any corruption, something that my callers
609         may wish to do on their own.
610         """
611         # TODO: When you're done writing this, see if this is ever
612         # actually used for something that _mark_bad_share isn't. I have
613         # a feeling that they will be used for very similar things, and
614         # that having them both here is just going to be an epic amount
615         # of code duplication.
616         #
617         # (well, okay, not epic, but meaningful)
618         self.log("removing reader %s" % reader)
619         # Remove the reader from _active_readers
620         self._active_readers.remove(reader)
621         # TODO: self.readers.remove(reader)?
622         for shnum in list(self.remaining_sharemap.keys()):
623             self.remaining_sharemap.discard(shnum, reader.peerid)
624
625
626     def _mark_bad_share(self, reader, f):
627         """
628         I mark the (peerid, shnum) encapsulated by my reader argument as
629         a bad share, which means that it will not be used anywhere else.
630
631         There are several reasons to want to mark something as a bad
632         share. These include:
633
634             - A connection error to the peer.
635             - A mismatched prefix (that is, a prefix that does not match
636               our local conception of the version information string).
637             - A failing block hash, salt hash, share hash, or other
638               integrity check.
639
640         This method will ensure that readers that we wish to mark bad
641         (for these reasons or other reasons) are not used for the rest
642         of the download. Additionally, it will attempt to tell the
643         remote peer (with no guarantee of success) that its share is
644         corrupt.
645         """
646         self.log("marking share %d on server %s as bad" % \
647                  (reader.shnum, reader))
648         prefix = self.verinfo[-2]
649         self.servermap.mark_bad_share(reader.peerid,
650                                       reader.shnum,
651                                       prefix)
652         self._remove_reader(reader)
653         self._bad_shares.add((reader.peerid, reader.shnum, f))
654         self._status.problems[reader.peerid] = f
655         self._last_failure = f
656         self.notify_server_corruption(reader.peerid, reader.shnum,
657                                       str(f.value))
658
659
660     def _download_current_segment(self):
661         """
662         I download, validate, decode, decrypt, and assemble the segment
663         that this Retrieve is currently responsible for downloading.
664         """
665         assert len(self._active_readers) >= self._required_shares
666         if self._current_segment <= self._last_segment:
667             d = self._process_segment(self._current_segment)
668         else:
669             d = defer.succeed(None)
670         d.addBoth(self._turn_barrier)
671         d.addCallback(self._check_for_done)
672         return d
673
674
675     def _turn_barrier(self, result):
676         """
677         I help the download process avoid the recursion limit issues
678         discussed in #237.
679         """
680         return fireEventually(result)
681
682
683     def _process_segment(self, segnum):
684         """
685         I download, validate, decode, and decrypt one segment of the
686         file that this Retrieve is retrieving. This means coordinating
687         the process of getting k blocks of that file, validating them,
688         assembling them into one segment with the decoder, and then
689         decrypting them.
690         """
691         self.log("processing segment %d" % segnum)
692
693         # TODO: The old code uses a marker. Should this code do that
694         # too? What did the Marker do?
695         assert len(self._active_readers) >= self._required_shares
696
697         # We need to ask each of our active readers for its block and
698         # salt. We will then validate those. If validation is
699         # successful, we will assemble the results into plaintext.
700         ds = []
701         for reader in self._active_readers:
702             started = time.time()
703             d = reader.get_block_and_salt(segnum, queue=True)
704             d2 = self._get_needed_hashes(reader, segnum)
705             dl = defer.DeferredList([d, d2], consumeErrors=True)
706             dl.addCallback(self._validate_block, segnum, reader, started)
707             dl.addErrback(self._validation_or_decoding_failed, [reader])
708             ds.append(dl)
709             reader.flush()
710         dl = defer.DeferredList(ds)
711         if self._verify:
712             dl.addCallback(lambda ignored: "")
713             dl.addCallback(self._set_segment)
714         else:
715             dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
716         return dl
717
718
719     def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum):
720         """
721         I take the results of fetching and validating the blocks from a
722         callback chain in another method. If the results are such that
723         they tell me that validation and fetching succeeded without
724         incident, I will proceed with decoding and decryption.
725         Otherwise, I will do nothing.
726         """
727         self.log("trying to decode and decrypt segment %d" % segnum)
728         failures = False
729         for block_and_salt in blocks_and_salts:
730             if not block_and_salt[0] or block_and_salt[1] == None:
731                 self.log("some validation operations failed; not proceeding")
732                 failures = True
733                 break
734         if not failures:
735             self.log("everything looks ok, building segment %d" % segnum)
736             d = self._decode_blocks(blocks_and_salts, segnum)
737             d.addCallback(self._decrypt_segment)
738             d.addErrback(self._validation_or_decoding_failed,
739                          self._active_readers)
740             # check to see whether we've been paused before writing
741             # anything.
742             d.addCallback(self._check_for_paused)
743             d.addCallback(self._set_segment)
744             return d
745         else:
746             return defer.succeed(None)
747
748
749     def _set_segment(self, segment):
750         """
751         Given a plaintext segment, I register that segment with the
752         target that is handling the file download.
753         """
754         self.log("got plaintext for segment %d" % self._current_segment)
755         if self._current_segment == self._start_segment:
756             # We're on the first segment. It's possible that we want
757             # only some part of the end of this segment, and that we
758             # just downloaded the whole thing to get that part. If so,
759             # we need to account for that and give the reader just the
760             # data that they want.
761             n = self._offset % self._segment_size
762             self.log("stripping %d bytes off of the first segment" % n)
763             self.log("original segment length: %d" % len(segment))
764             segment = segment[n:]
765             self.log("new segment length: %d" % len(segment))
766
767         if self._current_segment == self._last_segment and self._read_length is not None:
768             # We're on the last segment. It's possible that we only want
769             # part of the beginning of this segment, and that we
770             # downloaded the whole thing anyway. Make sure to give the
771             # caller only the portion of the segment that they want to
772             # receive.
773             extra = self._read_length
774             if self._start_segment != self._last_segment:
775                 extra -= self._segment_size - \
776                             (self._offset % self._segment_size)
777             extra %= self._segment_size
778             self.log("original segment length: %d" % len(segment))
779             segment = segment[:extra]
780             self.log("new segment length: %d" % len(segment))
781             self.log("only taking %d bytes of the last segment" % extra)
782
783         if not self._verify:
784             self._consumer.write(segment)
785         else:
786             # we don't care about the plaintext if we are doing a verify.
787             segment = None
788         self._current_segment += 1
789
790
791     def _validation_or_decoding_failed(self, f, readers):
792         """
793         I am called when a block or a salt fails to correctly validate, or when
794         the decryption or decoding operation fails for some reason.  I react to
795         this failure by notifying the remote server of corruption, and then
796         removing the remote peer from further activity.
797         """
798         assert isinstance(readers, list)
799         bad_shnums = [reader.shnum for reader in readers]
800
801         self.log("validation or decoding failed on share(s) %s, peer(s) %s "
802                  ", segment %d: %s" % \
803                  (bad_shnums, readers, self._current_segment, str(f)))
804         for reader in readers:
805             self._mark_bad_share(reader, f)
806         return
807
808
809     def _validate_block(self, results, segnum, reader, started):
810         """
811         I validate a block from one share on a remote server.
812         """
813         # Grab the part of the block hash tree that is necessary to
814         # validate this block, then generate the block hash root.
815         self.log("validating share %d for segment %d" % (reader.shnum,
816                                                              segnum))
817         elapsed = time.time() - started
818         self._status.add_fetch_timing(reader.peerid, elapsed)
819         self._set_current_status("validating blocks")
820         # Did we fail to fetch either of the things that we were
821         # supposed to? Fail if so.
822         if not results[0][0] and results[1][0]:
823             # handled by the errback handler.
824
825             # These all get batched into one query, so the resulting
826             # failure should be the same for all of them, so we can just
827             # use the first one.
828             assert isinstance(results[0][1], failure.Failure)
829
830             f = results[0][1]
831             raise CorruptShareError(reader.peerid,
832                                     reader.shnum,
833                                     "Connection error: %s" % str(f))
834
835         block_and_salt, block_and_sharehashes = results
836         block, salt = block_and_salt[1]
837         blockhashes, sharehashes = block_and_sharehashes[1]
838
839         blockhashes = dict(enumerate(blockhashes[1]))
840         self.log("the reader gave me the following blockhashes: %s" % \
841                  blockhashes.keys())
842         self.log("the reader gave me the following sharehashes: %s" % \
843                  sharehashes[1].keys())
844         bht = self._block_hash_trees[reader.shnum]
845
846         if bht.needed_hashes(segnum, include_leaf=True):
847             try:
848                 bht.set_hashes(blockhashes)
849             except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
850                     IndexError), e:
851                 raise CorruptShareError(reader.peerid,
852                                         reader.shnum,
853                                         "block hash tree failure: %s" % e)
854
855         if self._version == MDMF_VERSION:
856             blockhash = hashutil.block_hash(salt + block)
857         else:
858             blockhash = hashutil.block_hash(block)
859         # If this works without an error, then validation is
860         # successful.
861         try:
862            bht.set_hashes(leaves={segnum: blockhash})
863         except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
864                 IndexError), e:
865             raise CorruptShareError(reader.peerid,
866                                     reader.shnum,
867                                     "block hash tree failure: %s" % e)
868
869         # Reaching this point means that we know that this segment
870         # is correct. Now we need to check to see whether the share
871         # hash chain is also correct. 
872         # SDMF wrote share hash chains that didn't contain the
873         # leaves, which would be produced from the block hash tree.
874         # So we need to validate the block hash tree first. If
875         # successful, then bht[0] will contain the root for the
876         # shnum, which will be a leaf in the share hash tree, which
877         # will allow us to validate the rest of the tree.
878         if self.share_hash_tree.needed_hashes(reader.shnum,
879                                               include_leaf=True) or \
880                                               self._verify:
881             try:
882                 self.share_hash_tree.set_hashes(hashes=sharehashes[1],
883                                             leaves={reader.shnum: bht[0]})
884             except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
885                     IndexError), e:
886                 raise CorruptShareError(reader.peerid,
887                                         reader.shnum,
888                                         "corrupt hashes: %s" % e)
889
890         self.log('share %d is valid for segment %d' % (reader.shnum,
891                                                        segnum))
892         return {reader.shnum: (block, salt)}
893
894
895     def _get_needed_hashes(self, reader, segnum):
896         """
897         I get the hashes needed to validate segnum from the reader, then return
898         to my caller when this is done.
899         """
900         bht = self._block_hash_trees[reader.shnum]
901         needed = bht.needed_hashes(segnum, include_leaf=True)
902         # The root of the block hash tree is also a leaf in the share
903         # hash tree. So we don't need to fetch it from the remote
904         # server. In the case of files with one segment, this means that
905         # we won't fetch any block hash tree from the remote server,
906         # since the hash of each share of the file is the entire block
907         # hash tree, and is a leaf in the share hash tree. This is fine,
908         # since any share corruption will be detected in the share hash
909         # tree.
910         #needed.discard(0)
911         self.log("getting blockhashes for segment %d, share %d: %s" % \
912                  (segnum, reader.shnum, str(needed)))
913         d1 = reader.get_blockhashes(needed, queue=True, force_remote=True)
914         if self.share_hash_tree.needed_hashes(reader.shnum):
915             need = self.share_hash_tree.needed_hashes(reader.shnum)
916             self.log("also need sharehashes for share %d: %s" % (reader.shnum,
917                                                                  str(need)))
918             d2 = reader.get_sharehashes(need, queue=True, force_remote=True)
919         else:
920             d2 = defer.succeed({}) # the logic in the next method
921                                    # expects a dict
922         dl = defer.DeferredList([d1, d2], consumeErrors=True)
923         return dl
924
925
926     def _decode_blocks(self, blocks_and_salts, segnum):
927         """
928         I take a list of k blocks and salts, and decode that into a
929         single encrypted segment.
930         """
931         d = {}
932         # We want to merge our dictionaries to the form 
933         # {shnum: blocks_and_salts}
934         #
935         # The dictionaries come from validate block that way, so we just
936         # need to merge them.
937         for block_and_salt in blocks_and_salts:
938             d.update(block_and_salt[1])
939
940         # All of these blocks should have the same salt; in SDMF, it is
941         # the file-wide IV, while in MDMF it is the per-segment salt. In
942         # either case, we just need to get one of them and use it.
943         #
944         # d.items()[0] is like (shnum, (block, salt))
945         # d.items()[0][1] is like (block, salt)
946         # d.items()[0][1][1] is the salt.
947         salt = d.items()[0][1][1]
948         # Next, extract just the blocks from the dict. We'll use the
949         # salt in the next step.
950         share_and_shareids = [(k, v[0]) for k, v in d.items()]
951         d2 = dict(share_and_shareids)
952         shareids = []
953         shares = []
954         for shareid, share in d2.items():
955             shareids.append(shareid)
956             shares.append(share)
957
958         self._set_current_status("decoding")
959         started = time.time()
960         assert len(shareids) >= self._required_shares, len(shareids)
961         # zfec really doesn't want extra shares
962         shareids = shareids[:self._required_shares]
963         shares = shares[:self._required_shares]
964         self.log("decoding segment %d" % segnum)
965         if segnum == self._num_segments - 1:
966             d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
967         else:
968             d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
969         def _process(buffers):
970             segment = "".join(buffers)
971             self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
972                      segnum=segnum,
973                      numsegs=self._num_segments,
974                      level=log.NOISY)
975             self.log(" joined length %d, datalength %d" %
976                      (len(segment), self._data_length))
977             if segnum == self._num_segments - 1:
978                 size_to_use = self._tail_data_size
979             else:
980                 size_to_use = self._segment_size
981             segment = segment[:size_to_use]
982             self.log(" segment len=%d" % len(segment))
983             self._status.accumulate_decode_time(time.time() - started)
984             return segment, salt
985         d.addCallback(_process)
986         return d
987
988
989     def _decrypt_segment(self, segment_and_salt):
990         """
991         I take a single segment and its salt, and decrypt it. I return
992         the plaintext of the segment that is in my argument.
993         """
994         segment, salt = segment_and_salt
995         self._set_current_status("decrypting")
996         self.log("decrypting segment %d" % self._current_segment)
997         started = time.time()
998         key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
999         decryptor = AES(key)
1000         plaintext = decryptor.process(segment)
1001         self._status.accumulate_decrypt_time(time.time() - started)
1002         return plaintext
1003
1004
1005     def notify_server_corruption(self, peerid, shnum, reason):
1006         ss = self.servermap.connections[peerid]
1007         ss.callRemoteOnly("advise_corrupt_share",
1008                           "mutable", self._storage_index, shnum, reason)
1009
1010
1011     def _try_to_validate_privkey(self, enc_privkey, reader):
1012         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
1013         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
1014         if alleged_writekey != self._node.get_writekey():
1015             self.log("invalid privkey from %s shnum %d" %
1016                      (reader, reader.shnum),
1017                      level=log.WEIRD, umid="YIw4tA")
1018             if self._verify:
1019                 self.servermap.mark_bad_share(reader.peerid, reader.shnum,
1020                                               self.verinfo[-2])
1021                 e = CorruptShareError(reader.peerid,
1022                                       reader.shnum,
1023                                       "invalid privkey")
1024                 f = failure.Failure(e)
1025                 self._bad_shares.add((reader.peerid, reader.shnum, f))
1026             return
1027
1028         # it's good
1029         self.log("got valid privkey from shnum %d on reader %s" %
1030                  (reader.shnum, reader))
1031         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
1032         self._node._populate_encprivkey(enc_privkey)
1033         self._node._populate_privkey(privkey)
1034         self._need_privkey = False
1035
1036
1037     def _check_for_done(self, res):
1038         """
1039         I check to see if this Retrieve object has successfully finished
1040         its work.
1041
1042         I can exit in the following ways:
1043             - If there are no more segments to download, then I exit by
1044               causing self._done_deferred to fire with the plaintext
1045               content requested by the caller.
1046             - If there are still segments to be downloaded, and there
1047               are enough active readers (readers which have not broken
1048               and have not given us corrupt data) to continue
1049               downloading, I send control back to
1050               _download_current_segment.
1051             - If there are still segments to be downloaded but there are
1052               not enough active peers to download them, I ask
1053               _add_active_peers to add more peers. If it is successful,
1054               it will call _download_current_segment. If there are not
1055               enough peers to retrieve the file, then that will cause
1056               _done_deferred to errback.
1057         """
1058         self.log("checking for doneness")
1059         if self._current_segment > self._last_segment:
1060             # No more segments to download, we're done.
1061             self.log("got plaintext, done")
1062             return self._done()
1063
1064         if len(self._active_readers) >= self._required_shares:
1065             # More segments to download, but we have enough good peers
1066             # in self._active_readers that we can do that without issue,
1067             # so go nab the next segment.
1068             self.log("not done yet: on segment %d of %d" % \
1069                      (self._current_segment + 1, self._num_segments))
1070             return self._download_current_segment()
1071
1072         self.log("not done yet: on segment %d of %d, need to add peers" % \
1073                  (self._current_segment + 1, self._num_segments))
1074         return self._add_active_peers()
1075
1076
1077     def _done(self):
1078         """
1079         I am called by _check_for_done when the download process has
1080         finished successfully. After making some useful logging
1081         statements, I return the decrypted contents to the owner of this
1082         Retrieve object through self._done_deferred.
1083         """
1084         self._running = False
1085         self._status.set_active(False)
1086         now = time.time()
1087         self._status.timings['total'] = now - self._started
1088         self._status.timings['fetch'] = now - self._started_fetching
1089         self._status.set_status("Finished")
1090         self._status.set_progress(1.0)
1091
1092         # remember the encoding parameters, use them again next time
1093         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
1094          offsets_tuple) = self.verinfo
1095         self._node._populate_required_shares(k)
1096         self._node._populate_total_shares(N)
1097
1098         if self._verify:
1099             ret = list(self._bad_shares)
1100             self.log("done verifying, found %d bad shares" % len(ret))
1101         else:
1102             # TODO: upload status here?
1103             ret = self._consumer
1104             self._consumer.unregisterProducer()
1105         eventually(self._done_deferred.callback, ret)
1106
1107
1108     def _failed(self):
1109         """
1110         I am called by _add_active_peers when there are not enough
1111         active peers left to complete the download. After making some
1112         useful logging statements, I return an exception to that effect
1113         to the caller of this Retrieve object through
1114         self._done_deferred.
1115         """
1116         self._running = False
1117         self._status.set_active(False)
1118         now = time.time()
1119         self._status.timings['total'] = now - self._started
1120         self._status.timings['fetch'] = now - self._started_fetching
1121         self._status.set_status("Failed")
1122
1123         if self._verify:
1124             ret = list(self._bad_shares)
1125         else:
1126             format = ("ran out of peers: "
1127                       "have %(have)d of %(total)d segments "
1128                       "found %(bad)d bad shares "
1129                       "encoding %(k)d-of-%(n)d")
1130             args = {"have": self._current_segment,
1131                     "total": self._num_segments,
1132                     "need": self._last_segment,
1133                     "k": self._required_shares,
1134                     "n": self._total_shares,
1135                     "bad": len(self._bad_shares)}
1136             e = NotEnoughSharesError("%s, last failure: %s" % \
1137                                      (format % args, str(self._last_failure)))
1138             f = failure.Failure(e)
1139             ret = f
1140         eventually(self._done_deferred.callback, ret)