]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/retrieve.py
6c2c5c9bf8c48d81c267d1a5ca79aa2b70ce1011
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / retrieve.py
1
2 import time
3 from itertools import count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from twisted.internet.interfaces import IPushProducer, IConsumer
8 from foolscap.api import eventually, fireEventually, DeadReferenceError, \
9      RemoteException
10
11 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
12      DownloadStopped, MDMF_VERSION, SDMF_VERSION
13 from allmydata.util.assertutil import _assert
14 from allmydata.util import hashutil, log, mathutil, deferredutil
15 from allmydata.util.dictutil import DictOfSets
16 from allmydata import hashtree, codec
17 from allmydata.storage.server import si_b2a
18 from pycryptopp.cipher.aes import AES
19 from pycryptopp.publickey import rsa
20
21 from allmydata.mutable.common import CorruptShareError, BadShareError, \
22      UncoordinatedWriteError
23 from allmydata.mutable.layout import MDMFSlotReadProxy
24
25 class RetrieveStatus:
26     implements(IRetrieveStatus)
27     statusid_counter = count(0)
28     def __init__(self):
29         self.timings = {}
30         self.timings["fetch_per_server"] = {}
31         self.timings["decode"] = 0.0
32         self.timings["decrypt"] = 0.0
33         self.timings["cumulative_verify"] = 0.0
34         self._problems = {}
35         self.active = True
36         self.storage_index = None
37         self.helper = False
38         self.encoding = ("?","?")
39         self.size = None
40         self.status = "Not started"
41         self.progress = 0.0
42         self.counter = self.statusid_counter.next()
43         self.started = time.time()
44
45     def get_started(self):
46         return self.started
47     def get_storage_index(self):
48         return self.storage_index
49     def get_encoding(self):
50         return self.encoding
51     def using_helper(self):
52         return self.helper
53     def get_size(self):
54         return self.size
55     def get_status(self):
56         return self.status
57     def get_progress(self):
58         return self.progress
59     def get_active(self):
60         return self.active
61     def get_counter(self):
62         return self.counter
63     def get_problems(self):
64         return self._problems
65
66     def add_fetch_timing(self, server, elapsed):
67         if server not in self.timings["fetch_per_server"]:
68             self.timings["fetch_per_server"][server] = []
69         self.timings["fetch_per_server"][server].append(elapsed)
70     def accumulate_decode_time(self, elapsed):
71         self.timings["decode"] += elapsed
72     def accumulate_decrypt_time(self, elapsed):
73         self.timings["decrypt"] += elapsed
74     def set_storage_index(self, si):
75         self.storage_index = si
76     def set_helper(self, helper):
77         self.helper = helper
78     def set_encoding(self, k, n):
79         self.encoding = (k, n)
80     def set_size(self, size):
81         self.size = size
82     def set_status(self, status):
83         self.status = status
84     def set_progress(self, value):
85         self.progress = value
86     def set_active(self, value):
87         self.active = value
88     def add_problem(self, server, f):
89         serverid = server.get_serverid()
90         self._problems[serverid] = f
91
92 class Marker:
93     pass
94
95 class Retrieve:
96     # this class is currently single-use. Eventually (in MDMF) we will make
97     # it multi-use, in which case you can call download(range) multiple
98     # times, and each will have a separate response chain. However the
99     # Retrieve object will remain tied to a specific version of the file, and
100     # will use a single ServerMap instance.
101     implements(IPushProducer)
102
103     def __init__(self, filenode, storage_broker, servermap, verinfo,
104                  fetch_privkey=False, verify=False):
105         self._node = filenode
106         assert self._node.get_pubkey()
107         self._storage_broker = storage_broker
108         self._storage_index = filenode.get_storage_index()
109         assert self._node.get_readkey()
110         self._last_failure = None
111         prefix = si_b2a(self._storage_index)[:5]
112         self._log_number = log.msg("Retrieve(%s): starting" % prefix)
113         self._running = True
114         self._decoding = False
115         self._bad_shares = set()
116
117         self.servermap = servermap
118         assert self._node.get_pubkey()
119         self.verinfo = verinfo
120         # during repair, we may be called upon to grab the private key, since
121         # it wasn't picked up during a verify=False checker run, and we'll
122         # need it for repair to generate a new version.
123         self._need_privkey = verify or (fetch_privkey
124                                         and not self._node.get_privkey())
125
126         if self._need_privkey:
127             # TODO: Evaluate the need for this. We'll use it if we want
128             # to limit how many queries are on the wire for the privkey
129             # at once.
130             self._privkey_query_markers = [] # one Marker for each time we've
131                                              # tried to get the privkey.
132
133         # verify means that we are using the downloader logic to verify all
134         # of our shares. This tells the downloader a few things.
135         #
136         # 1. We need to download all of the shares.
137         # 2. We don't need to decode or decrypt the shares, since our
138         #    caller doesn't care about the plaintext, only the
139         #    information about which shares are or are not valid.
140         # 3. When we are validating readers, we need to validate the
141         #    signature on the prefix. Do we? We already do this in the
142         #    servermap update?
143         self._verify = verify
144
145         self._status = RetrieveStatus()
146         self._status.set_storage_index(self._storage_index)
147         self._status.set_helper(False)
148         self._status.set_progress(0.0)
149         self._status.set_active(True)
150         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
151          offsets_tuple) = self.verinfo
152         self._status.set_size(datalength)
153         self._status.set_encoding(k, N)
154         self.readers = {}
155         self._stopped = False
156         self._pause_deferred = None
157         self._offset = None
158         self._read_length = None
159         self.log("got seqnum %d" % self.verinfo[0])
160
161
162     def get_status(self):
163         return self._status
164
165     def log(self, *args, **kwargs):
166         if "parent" not in kwargs:
167             kwargs["parent"] = self._log_number
168         if "facility" not in kwargs:
169             kwargs["facility"] = "tahoe.mutable.retrieve"
170         return log.msg(*args, **kwargs)
171
172     def _set_current_status(self, state):
173         seg = "%d/%d" % (self._current_segment, self._last_segment)
174         self._status.set_status("segment %s (%s)" % (seg, state))
175
176     ###################
177     # IPushProducer
178
179     def pauseProducing(self):
180         """
181         I am called by my download target if we have produced too much
182         data for it to handle. I make the downloader stop producing new
183         data until my resumeProducing method is called.
184         """
185         if self._pause_deferred is not None:
186             return
187
188         # fired when the download is unpaused.
189         self._old_status = self._status.get_status()
190         self._set_current_status("paused")
191
192         self._pause_deferred = defer.Deferred()
193
194
195     def resumeProducing(self):
196         """
197         I am called by my download target once it is ready to begin
198         receiving data again.
199         """
200         if self._pause_deferred is None:
201             return
202
203         p = self._pause_deferred
204         self._pause_deferred = None
205         self._status.set_status(self._old_status)
206
207         eventually(p.callback, None)
208
209     def stopProducing(self):
210         self._stopped = True
211         self.resumeProducing()
212
213
214     def _check_for_paused(self, res):
215         """
216         I am called just before a write to the consumer. I return a
217         Deferred that eventually fires with the data that is to be
218         written to the consumer. If the download has not been paused,
219         the Deferred fires immediately. Otherwise, the Deferred fires
220         when the downloader is unpaused.
221         """
222         if self._pause_deferred is not None:
223             d = defer.Deferred()
224             self._pause_deferred.addCallback(lambda ignored: d.callback(res))
225             return d
226         return res
227
228     def _check_for_stopped(self, res):
229         if self._stopped:
230             raise DownloadStopped("our Consumer called stopProducing()")
231         return res
232
233
234     def download(self, consumer=None, offset=0, size=None):
235         assert IConsumer.providedBy(consumer) or self._verify
236
237         if consumer:
238             self._consumer = consumer
239             # we provide IPushProducer, so streaming=True, per
240             # IConsumer.
241             self._consumer.registerProducer(self, streaming=True)
242
243         self._done_deferred = defer.Deferred()
244         self._offset = offset
245         self._read_length = size
246         self._setup_encoding_parameters()
247         self._setup_download()
248         self.log("starting download")
249         self._started_fetching = time.time()
250         # The download process beyond this is a state machine.
251         # _add_active_servers will select the servers that we want to use
252         # for the download, and then attempt to start downloading. After
253         # each segment, it will check for doneness, reacting to broken
254         # servers and corrupt shares as necessary. If it runs out of good
255         # servers before downloading all of the segments, _done_deferred
256         # will errback.  Otherwise, it will eventually callback with the
257         # contents of the mutable file.
258         self.loop()
259         return self._done_deferred
260
261     def loop(self):
262         d = fireEventually(None) # avoid #237 recursion limit problem
263         d.addCallback(lambda ign: self._activate_enough_servers())
264         d.addCallback(lambda ign: self._download_current_segment())
265         # when we're done, _download_current_segment will call _done. If we
266         # aren't, it will call loop() again.
267         d.addErrback(self._error)
268
269     def _setup_download(self):
270         self._started = time.time()
271         self._status.set_status("Retrieving Shares")
272
273         # how many shares do we need?
274         (seqnum,
275          root_hash,
276          IV,
277          segsize,
278          datalength,
279          k,
280          N,
281          prefix,
282          offsets_tuple) = self.verinfo
283
284         # first, which servers can we use?
285         versionmap = self.servermap.make_versionmap()
286         shares = versionmap[self.verinfo]
287         # this sharemap is consumed as we decide to send requests
288         self.remaining_sharemap = DictOfSets()
289         for (shnum, server, timestamp) in shares:
290             self.remaining_sharemap.add(shnum, server)
291             # Reuse the SlotReader from the servermap.
292             key = (self.verinfo, server.get_serverid(),
293                    self._storage_index, shnum)
294             if key in self.servermap.proxies:
295                 reader = self.servermap.proxies[key]
296             else:
297                 reader = MDMFSlotReadProxy(server.get_rref(),
298                                            self._storage_index, shnum, None)
299             reader.server = server
300             self.readers[shnum] = reader
301
302         if len(self.remaining_sharemap) < k:
303             self._raise_notenoughshareserror()
304
305         self.shares = {} # maps shnum to validated blocks
306         self._active_readers = [] # list of active readers for this dl.
307         self._block_hash_trees = {} # shnum => hashtree
308
309         for i in xrange(self._total_shares):
310             # So we don't have to do this later.
311             self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
312
313         # We need one share hash tree for the entire file; its leaves
314         # are the roots of the block hash trees for the shares that
315         # comprise it, and its root is in the verinfo.
316         self.share_hash_tree = hashtree.IncompleteHashTree(N)
317         self.share_hash_tree.set_hashes({0: root_hash})
318
319     def decode(self, blocks_and_salts, segnum):
320         """
321         I am a helper method that the mutable file update process uses
322         as a shortcut to decode and decrypt the segments that it needs
323         to fetch in order to perform a file update. I take in a
324         collection of blocks and salts, and pick some of those to make a
325         segment with. I return the plaintext associated with that
326         segment.
327         """
328         # We don't need the block hash trees in this case.
329         self._block_hash_trees = None
330         self._setup_encoding_parameters()
331
332         # _decode_blocks() expects the output of a gatherResults that
333         # contains the outputs of _validate_block() (each of which is a dict
334         # mapping shnum to (block,salt) bytestrings).
335         d = self._decode_blocks([blocks_and_salts], segnum)
336         d.addCallback(self._decrypt_segment)
337         return d
338
339
340     def _setup_encoding_parameters(self):
341         """
342         I set up the encoding parameters, including k, n, the number
343         of segments associated with this file, and the segment decoders.
344         """
345         (seqnum,
346          root_hash,
347          IV,
348          segsize,
349          datalength,
350          k,
351          n,
352          known_prefix,
353          offsets_tuple) = self.verinfo
354         self._required_shares = k
355         self._total_shares = n
356         self._segment_size = segsize
357         self._data_length = datalength
358
359         if not IV:
360             self._version = MDMF_VERSION
361         else:
362             self._version = SDMF_VERSION
363
364         if datalength and segsize:
365             self._num_segments = mathutil.div_ceil(datalength, segsize)
366             self._tail_data_size = datalength % segsize
367         else:
368             self._num_segments = 0
369             self._tail_data_size = 0
370
371         self._segment_decoder = codec.CRSDecoder()
372         self._segment_decoder.set_params(segsize, k, n)
373
374         if  not self._tail_data_size:
375             self._tail_data_size = segsize
376
377         self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
378                                                          self._required_shares)
379         if self._tail_segment_size == self._segment_size:
380             self._tail_decoder = self._segment_decoder
381         else:
382             self._tail_decoder = codec.CRSDecoder()
383             self._tail_decoder.set_params(self._tail_segment_size,
384                                           self._required_shares,
385                                           self._total_shares)
386
387         self.log("got encoding parameters: "
388                  "k: %d "
389                  "n: %d "
390                  "%d segments of %d bytes each (%d byte tail segment)" % \
391                  (k, n, self._num_segments, self._segment_size,
392                   self._tail_segment_size))
393
394         # Our last task is to tell the downloader where to start and
395         # where to stop. We use three parameters for that:
396         #   - self._start_segment: the segment that we need to start
397         #     downloading from.
398         #   - self._current_segment: the next segment that we need to
399         #     download.
400         #   - self._last_segment: The last segment that we were asked to
401         #     download.
402         #
403         #  We say that the download is complete when
404         #  self._current_segment > self._last_segment. We use
405         #  self._start_segment and self._last_segment to know when to
406         #  strip things off of segments, and how much to strip.
407         if self._offset:
408             self.log("got offset: %d" % self._offset)
409             # our start segment is the first segment containing the
410             # offset we were given.
411             start = self._offset // self._segment_size
412
413             _assert(start < self._num_segments,
414                     start=start, num_segments=self._num_segments,
415                     offset=self._offset, segment_size=self._segment_size)
416             self._start_segment = start
417             self.log("got start segment: %d" % self._start_segment)
418         else:
419             self._start_segment = 0
420
421
422         # If self._read_length is None, then we want to read the whole
423         # file. Otherwise, we want to read only part of the file, and
424         # need to figure out where to stop reading.
425         if self._read_length is not None:
426             # our end segment is the last segment containing part of the
427             # segment that we were asked to read.
428             self.log("got read length %d" % self._read_length)
429             if self._read_length != 0:
430                 end_data = self._offset + self._read_length
431
432                 # We don't actually need to read the byte at end_data,
433                 # but the one before it.
434                 end = (end_data - 1) // self._segment_size
435
436                 _assert(end < self._num_segments,
437                         end=end, num_segments=self._num_segments,
438                         end_data=end_data, offset=self._offset, read_length=self._read_length,
439                         segment_size=self._segment_size)
440                 self._last_segment = end
441             else:
442                 self._last_segment = self._start_segment
443             self.log("got end segment: %d" % self._last_segment)
444         else:
445             self._last_segment = self._num_segments - 1
446
447         self._current_segment = self._start_segment
448
449     def _activate_enough_servers(self):
450         """
451         I populate self._active_readers with enough active readers to
452         retrieve the contents of this mutable file. I am called before
453         downloading starts, and (eventually) after each validation
454         error, connection error, or other problem in the download.
455         """
456         # TODO: It would be cool to investigate other heuristics for
457         # reader selection. For instance, the cost (in time the user
458         # spends waiting for their file) of selecting a really slow server
459         # that happens to have a primary share is probably more than
460         # selecting a really fast server that doesn't have a primary
461         # share. Maybe the servermap could be extended to provide this
462         # information; it could keep track of latency information while
463         # it gathers more important data, and then this routine could
464         # use that to select active readers.
465         #
466         # (these and other questions would be easier to answer with a
467         #  robust, configurable tahoe-lafs simulator, which modeled node
468         #  failures, differences in node speed, and other characteristics
469         #  that we expect storage servers to have.  You could have
470         #  presets for really stable grids (like allmydata.com),
471         #  friendnets, make it easy to configure your own settings, and
472         #  then simulate the effect of big changes on these use cases
473         #  instead of just reasoning about what the effect might be. Out
474         #  of scope for MDMF, though.)
475
476         # XXX: Why don't format= log messages work here?
477
478         known_shnums = set(self.remaining_sharemap.keys())
479         used_shnums = set([r.shnum for r in self._active_readers])
480         unused_shnums = known_shnums - used_shnums
481
482         if self._verify:
483             new_shnums = unused_shnums # use them all
484         elif len(self._active_readers) < self._required_shares:
485             # need more shares
486             more = self._required_shares - len(self._active_readers)
487             # We favor lower numbered shares, since FEC is faster with
488             # primary shares than with other shares, and lower-numbered
489             # shares are more likely to be primary than higher numbered
490             # shares.
491             new_shnums = sorted(unused_shnums)[:more]
492             if len(new_shnums) < more:
493                 # We don't have enough readers to retrieve the file; fail.
494                 self._raise_notenoughshareserror()
495         else:
496             new_shnums = []
497
498         self.log("adding %d new servers to the active list" % len(new_shnums))
499         for shnum in new_shnums:
500             reader = self.readers[shnum]
501             self._active_readers.append(reader)
502             self.log("added reader for share %d" % shnum)
503             # Each time we add a reader, we check to see if we need the
504             # private key. If we do, we politely ask for it and then continue
505             # computing. If we find that we haven't gotten it at the end of
506             # segment decoding, then we'll take more drastic measures.
507             if self._need_privkey and not self._node.is_readonly():
508                 d = reader.get_encprivkey()
509                 d.addCallback(self._try_to_validate_privkey, reader, reader.server)
510                 # XXX: don't just drop the Deferred. We need error-reporting
511                 # but not flow-control here.
512
513     def _try_to_validate_prefix(self, prefix, reader):
514         """
515         I check that the prefix returned by a candidate server for
516         retrieval matches the prefix that the servermap knows about
517         (and, hence, the prefix that was validated earlier). If it does,
518         I return True, which means that I approve of the use of the
519         candidate server for segment retrieval. If it doesn't, I return
520         False, which means that another server must be chosen.
521         """
522         (seqnum,
523          root_hash,
524          IV,
525          segsize,
526          datalength,
527          k,
528          N,
529          known_prefix,
530          offsets_tuple) = self.verinfo
531         if known_prefix != prefix:
532             self.log("prefix from share %d doesn't match" % reader.shnum)
533             raise UncoordinatedWriteError("Mismatched prefix -- this could "
534                                           "indicate an uncoordinated write")
535         # Otherwise, we're okay -- no issues.
536
537     def _mark_bad_share(self, server, shnum, reader, f):
538         """
539         I mark the given (server, shnum) as a bad share, which means that it
540         will not be used anywhere else.
541
542         There are several reasons to want to mark something as a bad
543         share. These include:
544
545             - A connection error to the server.
546             - A mismatched prefix (that is, a prefix that does not match
547               our local conception of the version information string).
548             - A failing block hash, salt hash, share hash, or other
549               integrity check.
550
551         This method will ensure that readers that we wish to mark bad
552         (for these reasons or other reasons) are not used for the rest
553         of the download. Additionally, it will attempt to tell the
554         remote server (with no guarantee of success) that its share is
555         corrupt.
556         """
557         self.log("marking share %d on server %s as bad" % \
558                  (shnum, server.get_name()))
559         prefix = self.verinfo[-2]
560         self.servermap.mark_bad_share(server, shnum, prefix)
561         self._bad_shares.add((server, shnum, f))
562         self._status.add_problem(server, f)
563         self._last_failure = f
564
565         # Remove the reader from _active_readers
566         self._active_readers.remove(reader)
567         for shnum in list(self.remaining_sharemap.keys()):
568             self.remaining_sharemap.discard(shnum, reader.server)
569
570         if f.check(BadShareError):
571             self.notify_server_corruption(server, shnum, str(f.value))
572
573     def _download_current_segment(self):
574         """
575         I download, validate, decode, decrypt, and assemble the segment
576         that this Retrieve is currently responsible for downloading.
577         """
578         if self._current_segment > self._last_segment:
579             # No more segments to download, we're done.
580             self.log("got plaintext, done")
581             return self._done()
582         elif self._verify and len(self._active_readers) == 0:
583             self.log("no more good shares, no need to keep verifying")
584             return self._done()
585         self.log("on segment %d of %d" %
586                  (self._current_segment + 1, self._num_segments))
587         d = self._process_segment(self._current_segment)
588         d.addCallback(lambda ign: self.loop())
589         return d
590
591     def _process_segment(self, segnum):
592         """
593         I download, validate, decode, and decrypt one segment of the
594         file that this Retrieve is retrieving. This means coordinating
595         the process of getting k blocks of that file, validating them,
596         assembling them into one segment with the decoder, and then
597         decrypting them.
598         """
599         self.log("processing segment %d" % segnum)
600
601         # TODO: The old code uses a marker. Should this code do that
602         # too? What did the Marker do?
603
604         # We need to ask each of our active readers for its block and
605         # salt. We will then validate those. If validation is
606         # successful, we will assemble the results into plaintext.
607         ds = []
608         for reader in self._active_readers:
609             started = time.time()
610             d1 = reader.get_block_and_salt(segnum)
611             d2,d3 = self._get_needed_hashes(reader, segnum)
612             d = deferredutil.gatherResults([d1,d2,d3])
613             d.addCallback(self._validate_block, segnum, reader, reader.server, started)
614             # _handle_bad_share takes care of recoverable errors (by dropping
615             # that share and returning None). Any other errors (i.e. code
616             # bugs) are passed through and cause the retrieve to fail.
617             d.addErrback(self._handle_bad_share, [reader])
618             ds.append(d)
619         dl = deferredutil.gatherResults(ds)
620         if self._verify:
621             dl.addCallback(lambda ignored: "")
622             dl.addCallback(self._set_segment)
623         else:
624             dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
625         return dl
626
627
628     def _maybe_decode_and_decrypt_segment(self, results, segnum):
629         """
630         I take the results of fetching and validating the blocks from
631         _process_segment. If validation and fetching succeeded without
632         incident, I will proceed with decoding and decryption. Otherwise, I
633         will do nothing.
634         """
635         self.log("trying to decode and decrypt segment %d" % segnum)
636
637         # 'results' is the output of a gatherResults set up in
638         # _process_segment(). Each component Deferred will either contain the
639         # non-Failure output of _validate_block() for a single block (i.e.
640         # {segnum:(block,salt)}), or None if _validate_block threw an
641         # exception and _validation_or_decoding_failed handled it (by
642         # dropping that server).
643
644         if None in results:
645             self.log("some validation operations failed; not proceeding")
646             return defer.succeed(None)
647         self.log("everything looks ok, building segment %d" % segnum)
648         d = self._decode_blocks(results, segnum)
649         d.addCallback(self._decrypt_segment)
650         # check to see whether we've been paused before writing
651         # anything.
652         d.addCallback(self._check_for_paused)
653         d.addCallback(self._check_for_stopped)
654         d.addCallback(self._set_segment)
655         return d
656
657
658     def _set_segment(self, segment):
659         """
660         Given a plaintext segment, I register that segment with the
661         target that is handling the file download.
662         """
663         self.log("got plaintext for segment %d" % self._current_segment)
664         if self._current_segment == self._start_segment:
665             # We're on the first segment. It's possible that we want
666             # only some part of the end of this segment, and that we
667             # just downloaded the whole thing to get that part. If so,
668             # we need to account for that and give the reader just the
669             # data that they want.
670             n = self._offset % self._segment_size
671             self.log("stripping %d bytes off of the first segment" % n)
672             self.log("original segment length: %d" % len(segment))
673             segment = segment[n:]
674             self.log("new segment length: %d" % len(segment))
675
676         if self._current_segment == self._last_segment and self._read_length is not None:
677             # We're on the last segment. It's possible that we only want
678             # part of the beginning of this segment, and that we
679             # downloaded the whole thing anyway. Make sure to give the
680             # caller only the portion of the segment that they want to
681             # receive.
682             extra = self._read_length
683             if self._start_segment != self._last_segment:
684                 extra -= self._segment_size - \
685                             (self._offset % self._segment_size)
686             extra %= self._segment_size
687             self.log("original segment length: %d" % len(segment))
688             segment = segment[:extra]
689             self.log("new segment length: %d" % len(segment))
690             self.log("only taking %d bytes of the last segment" % extra)
691
692         if not self._verify:
693             self._consumer.write(segment)
694         else:
695             # we don't care about the plaintext if we are doing a verify.
696             segment = None
697         self._current_segment += 1
698
699
700     def _handle_bad_share(self, f, readers):
701         """
702         I am called when a block or a salt fails to correctly validate, or when
703         the decryption or decoding operation fails for some reason.  I react to
704         this failure by notifying the remote server of corruption, and then
705         removing the remote server from further activity.
706         """
707         # these are the errors we can tolerate: by giving up on this share
708         # and finding others to replace it. Any other errors (i.e. coding
709         # bugs) are re-raised, causing the download to fail.
710         f.trap(DeadReferenceError, RemoteException, BadShareError)
711
712         # DeadReferenceError happens when we try to fetch data from a server
713         # that has gone away. RemoteException happens if the server had an
714         # internal error. BadShareError encompasses: (UnknownVersionError,
715         # LayoutInvalid, struct.error) which happen when we get obviously
716         # wrong data, and CorruptShareError which happens later, when we
717         # perform integrity checks on the data.
718
719         assert isinstance(readers, list)
720         bad_shnums = [reader.shnum for reader in readers]
721
722         self.log("validation or decoding failed on share(s) %s, server(s) %s "
723                  ", segment %d: %s" % \
724                  (bad_shnums, readers, self._current_segment, str(f)))
725         for reader in readers:
726             self._mark_bad_share(reader.server, reader.shnum, reader, f)
727         return None
728
729
730     def _validate_block(self, results, segnum, reader, server, started):
731         """
732         I validate a block from one share on a remote server.
733         """
734         # Grab the part of the block hash tree that is necessary to
735         # validate this block, then generate the block hash root.
736         self.log("validating share %d for segment %d" % (reader.shnum,
737                                                              segnum))
738         elapsed = time.time() - started
739         self._status.add_fetch_timing(server, elapsed)
740         self._set_current_status("validating blocks")
741
742         block_and_salt, blockhashes, sharehashes = results
743         block, salt = block_and_salt
744         assert type(block) is str, (block, salt)
745
746         blockhashes = dict(enumerate(blockhashes))
747         self.log("the reader gave me the following blockhashes: %s" % \
748                  blockhashes.keys())
749         self.log("the reader gave me the following sharehashes: %s" % \
750                  sharehashes.keys())
751         bht = self._block_hash_trees[reader.shnum]
752
753         if bht.needed_hashes(segnum, include_leaf=True):
754             try:
755                 bht.set_hashes(blockhashes)
756             except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
757                     IndexError), e:
758                 raise CorruptShareError(server,
759                                         reader.shnum,
760                                         "block hash tree failure: %s" % e)
761
762         if self._version == MDMF_VERSION:
763             blockhash = hashutil.block_hash(salt + block)
764         else:
765             blockhash = hashutil.block_hash(block)
766         # If this works without an error, then validation is
767         # successful.
768         try:
769            bht.set_hashes(leaves={segnum: blockhash})
770         except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
771                 IndexError), e:
772             raise CorruptShareError(server,
773                                     reader.shnum,
774                                     "block hash tree failure: %s" % e)
775
776         # Reaching this point means that we know that this segment
777         # is correct. Now we need to check to see whether the share
778         # hash chain is also correct.
779         # SDMF wrote share hash chains that didn't contain the
780         # leaves, which would be produced from the block hash tree.
781         # So we need to validate the block hash tree first. If
782         # successful, then bht[0] will contain the root for the
783         # shnum, which will be a leaf in the share hash tree, which
784         # will allow us to validate the rest of the tree.
785         try:
786             self.share_hash_tree.set_hashes(hashes=sharehashes,
787                                         leaves={reader.shnum: bht[0]})
788         except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
789                 IndexError), e:
790             raise CorruptShareError(server,
791                                     reader.shnum,
792                                     "corrupt hashes: %s" % e)
793
794         self.log('share %d is valid for segment %d' % (reader.shnum,
795                                                        segnum))
796         return {reader.shnum: (block, salt)}
797
798
799     def _get_needed_hashes(self, reader, segnum):
800         """
801         I get the hashes needed to validate segnum from the reader, then return
802         to my caller when this is done.
803         """
804         bht = self._block_hash_trees[reader.shnum]
805         needed = bht.needed_hashes(segnum, include_leaf=True)
806         # The root of the block hash tree is also a leaf in the share
807         # hash tree. So we don't need to fetch it from the remote
808         # server. In the case of files with one segment, this means that
809         # we won't fetch any block hash tree from the remote server,
810         # since the hash of each share of the file is the entire block
811         # hash tree, and is a leaf in the share hash tree. This is fine,
812         # since any share corruption will be detected in the share hash
813         # tree.
814         #needed.discard(0)
815         self.log("getting blockhashes for segment %d, share %d: %s" % \
816                  (segnum, reader.shnum, str(needed)))
817         # TODO is force_remote necessary here?
818         d1 = reader.get_blockhashes(needed, force_remote=False)
819         if self.share_hash_tree.needed_hashes(reader.shnum):
820             need = self.share_hash_tree.needed_hashes(reader.shnum)
821             self.log("also need sharehashes for share %d: %s" % (reader.shnum,
822                                                                  str(need)))
823             d2 = reader.get_sharehashes(need, force_remote=False)
824         else:
825             d2 = defer.succeed({}) # the logic in the next method
826                                    # expects a dict
827         return d1,d2
828
829
830     def _decode_blocks(self, results, segnum):
831         """
832         I take a list of k blocks and salts, and decode that into a
833         single encrypted segment.
834         """
835         # 'results' is one or more dicts (each {shnum:(block,salt)}), and we
836         # want to merge them all
837         blocks_and_salts = {}
838         for d in results:
839             blocks_and_salts.update(d)
840
841         # All of these blocks should have the same salt; in SDMF, it is
842         # the file-wide IV, while in MDMF it is the per-segment salt. In
843         # either case, we just need to get one of them and use it.
844         #
845         # d.items()[0] is like (shnum, (block, salt))
846         # d.items()[0][1] is like (block, salt)
847         # d.items()[0][1][1] is the salt.
848         salt = blocks_and_salts.items()[0][1][1]
849         # Next, extract just the blocks from the dict. We'll use the
850         # salt in the next step.
851         share_and_shareids = [(k, v[0]) for k, v in blocks_and_salts.items()]
852         d2 = dict(share_and_shareids)
853         shareids = []
854         shares = []
855         for shareid, share in d2.items():
856             shareids.append(shareid)
857             shares.append(share)
858
859         self._set_current_status("decoding")
860         started = time.time()
861         assert len(shareids) >= self._required_shares, len(shareids)
862         # zfec really doesn't want extra shares
863         shareids = shareids[:self._required_shares]
864         shares = shares[:self._required_shares]
865         self.log("decoding segment %d" % segnum)
866         if segnum == self._num_segments - 1:
867             d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
868         else:
869             d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
870         def _process(buffers):
871             segment = "".join(buffers)
872             self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
873                      segnum=segnum,
874                      numsegs=self._num_segments,
875                      level=log.NOISY)
876             self.log(" joined length %d, datalength %d" %
877                      (len(segment), self._data_length))
878             if segnum == self._num_segments - 1:
879                 size_to_use = self._tail_data_size
880             else:
881                 size_to_use = self._segment_size
882             segment = segment[:size_to_use]
883             self.log(" segment len=%d" % len(segment))
884             self._status.accumulate_decode_time(time.time() - started)
885             return segment, salt
886         d.addCallback(_process)
887         return d
888
889
890     def _decrypt_segment(self, segment_and_salt):
891         """
892         I take a single segment and its salt, and decrypt it. I return
893         the plaintext of the segment that is in my argument.
894         """
895         segment, salt = segment_and_salt
896         self._set_current_status("decrypting")
897         self.log("decrypting segment %d" % self._current_segment)
898         started = time.time()
899         key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
900         decryptor = AES(key)
901         plaintext = decryptor.process(segment)
902         self._status.accumulate_decrypt_time(time.time() - started)
903         return plaintext
904
905
906     def notify_server_corruption(self, server, shnum, reason):
907         rref = server.get_rref()
908         rref.callRemoteOnly("advise_corrupt_share",
909                             "mutable", self._storage_index, shnum, reason)
910
911
912     def _try_to_validate_privkey(self, enc_privkey, reader, server):
913         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
914         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
915         if alleged_writekey != self._node.get_writekey():
916             self.log("invalid privkey from %s shnum %d" %
917                      (reader, reader.shnum),
918                      level=log.WEIRD, umid="YIw4tA")
919             if self._verify:
920                 self.servermap.mark_bad_share(server, reader.shnum,
921                                               self.verinfo[-2])
922                 e = CorruptShareError(server,
923                                       reader.shnum,
924                                       "invalid privkey")
925                 f = failure.Failure(e)
926                 self._bad_shares.add((server, reader.shnum, f))
927             return
928
929         # it's good
930         self.log("got valid privkey from shnum %d on reader %s" %
931                  (reader.shnum, reader))
932         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
933         self._node._populate_encprivkey(enc_privkey)
934         self._node._populate_privkey(privkey)
935         self._need_privkey = False
936
937
938
939     def _done(self):
940         """
941         I am called by _download_current_segment when the download process
942         has finished successfully. After making some useful logging
943         statements, I return the decrypted contents to the owner of this
944         Retrieve object through self._done_deferred.
945         """
946         self._running = False
947         self._status.set_active(False)
948         now = time.time()
949         self._status.timings['total'] = now - self._started
950         self._status.timings['fetch'] = now - self._started_fetching
951         self._status.set_status("Finished")
952         self._status.set_progress(1.0)
953
954         # remember the encoding parameters, use them again next time
955         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
956          offsets_tuple) = self.verinfo
957         self._node._populate_required_shares(k)
958         self._node._populate_total_shares(N)
959
960         if self._verify:
961             ret = self._bad_shares
962             self.log("done verifying, found %d bad shares" % len(ret))
963         else:
964             # TODO: upload status here?
965             ret = self._consumer
966             self._consumer.unregisterProducer()
967         eventually(self._done_deferred.callback, ret)
968
969
970     def _raise_notenoughshareserror(self):
971         """
972         I am called when there are not enough active servers left to complete
973         the download. After making some useful logging statements, I throw an
974         exception to that effect to the caller of this Retrieve object through
975         self._done_deferred.
976         """
977
978         format = ("ran out of servers: "
979                   "have %(have)d of %(total)d segments; "
980                   "found %(bad)d bad shares; "
981                   "have %(remaining)d remaining shares of the right version; "
982                   "encoding %(k)d-of-%(n)d")
983         args = {"have": self._current_segment,
984                 "total": self._num_segments,
985                 "need": self._last_segment,
986                 "k": self._required_shares,
987                 "n": self._total_shares,
988                 "bad": len(self._bad_shares),
989                 "remaining": len(self.remaining_sharemap),
990                }
991         raise NotEnoughSharesError("%s, last failure: %s" %
992                                    (format % args, str(self._last_failure)))
993
994     def _error(self, f):
995         # all errors, including NotEnoughSharesError, land here
996         self._running = False
997         self._status.set_active(False)
998         now = time.time()
999         self._status.timings['total'] = now - self._started
1000         self._status.timings['fetch'] = now - self._started_fetching
1001         self._status.set_status("Failed")
1002         eventually(self._done_deferred.errback, f)