]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/mutable/retrieve.py
100350a0b62b00925bfae12dddd70b56649665e6
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / retrieve.py
1
2 import time
3 from itertools import count
4 from zope.interface import implements
5 from twisted.internet import defer
6 from twisted.python import failure
7 from twisted.internet.interfaces import IPushProducer, IConsumer
8 from foolscap.api import eventually, fireEventually
9 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
10                                  MDMF_VERSION, SDMF_VERSION
11 from allmydata.util import hashutil, log, mathutil
12 from allmydata.util.dictutil import DictOfSets
13 from allmydata import hashtree, codec
14 from allmydata.storage.server import si_b2a
15 from pycryptopp.cipher.aes import AES
16 from pycryptopp.publickey import rsa
17
18 from allmydata.mutable.common import CorruptShareError, UncoordinatedWriteError
19 from allmydata.mutable.layout import MDMFSlotReadProxy
20
21 class RetrieveStatus:
22     implements(IRetrieveStatus)
23     statusid_counter = count(0)
24     def __init__(self):
25         self.timings = {}
26         self.timings["fetch_per_server"] = {}
27         self.timings["decode"] = 0.0
28         self.timings["decrypt"] = 0.0
29         self.timings["cumulative_verify"] = 0.0
30         self.problems = {}
31         self.active = True
32         self.storage_index = None
33         self.helper = False
34         self.encoding = ("?","?")
35         self.size = None
36         self.status = "Not started"
37         self.progress = 0.0
38         self.counter = self.statusid_counter.next()
39         self.started = time.time()
40
41     def get_started(self):
42         return self.started
43     def get_storage_index(self):
44         return self.storage_index
45     def get_encoding(self):
46         return self.encoding
47     def using_helper(self):
48         return self.helper
49     def get_size(self):
50         return self.size
51     def get_status(self):
52         return self.status
53     def get_progress(self):
54         return self.progress
55     def get_active(self):
56         return self.active
57     def get_counter(self):
58         return self.counter
59
60     def add_fetch_timing(self, peerid, elapsed):
61         if peerid not in self.timings["fetch_per_server"]:
62             self.timings["fetch_per_server"][peerid] = []
63         self.timings["fetch_per_server"][peerid].append(elapsed)
64     def accumulate_decode_time(self, elapsed):
65         self.timings["decode"] += elapsed
66     def accumulate_decrypt_time(self, elapsed):
67         self.timings["decrypt"] += elapsed
68     def set_storage_index(self, si):
69         self.storage_index = si
70     def set_helper(self, helper):
71         self.helper = helper
72     def set_encoding(self, k, n):
73         self.encoding = (k, n)
74     def set_size(self, size):
75         self.size = size
76     def set_status(self, status):
77         self.status = status
78     def set_progress(self, value):
79         self.progress = value
80     def set_active(self, value):
81         self.active = value
82
83 class Marker:
84     pass
85
86 class Retrieve:
87     # this class is currently single-use. Eventually (in MDMF) we will make
88     # it multi-use, in which case you can call download(range) multiple
89     # times, and each will have a separate response chain. However the
90     # Retrieve object will remain tied to a specific version of the file, and
91     # will use a single ServerMap instance.
92     implements(IPushProducer)
93
94     def __init__(self, filenode, servermap, verinfo, fetch_privkey=False,
95                  verify=False):
96         self._node = filenode
97         assert self._node.get_pubkey()
98         self._storage_index = filenode.get_storage_index()
99         assert self._node.get_readkey()
100         self._last_failure = None
101         prefix = si_b2a(self._storage_index)[:5]
102         self._log_number = log.msg("Retrieve(%s): starting" % prefix)
103         self._outstanding_queries = {} # maps (peerid,shnum) to start_time
104         self._running = True
105         self._decoding = False
106         self._bad_shares = set()
107
108         self.servermap = servermap
109         assert self._node.get_pubkey()
110         self.verinfo = verinfo
111         # during repair, we may be called upon to grab the private key, since
112         # it wasn't picked up during a verify=False checker run, and we'll
113         # need it for repair to generate a new version.
114         self._need_privkey = verify or (fetch_privkey
115                                         and not self._node.get_privkey())
116
117         if self._need_privkey:
118             # TODO: Evaluate the need for this. We'll use it if we want
119             # to limit how many queries are on the wire for the privkey
120             # at once.
121             self._privkey_query_markers = [] # one Marker for each time we've
122                                              # tried to get the privkey.
123
124         # verify means that we are using the downloader logic to verify all
125         # of our shares. This tells the downloader a few things.
126         # 
127         # 1. We need to download all of the shares.
128         # 2. We don't need to decode or decrypt the shares, since our
129         #    caller doesn't care about the plaintext, only the
130         #    information about which shares are or are not valid.
131         # 3. When we are validating readers, we need to validate the
132         #    signature on the prefix. Do we? We already do this in the
133         #    servermap update?
134         self._verify = verify
135
136         self._status = RetrieveStatus()
137         self._status.set_storage_index(self._storage_index)
138         self._status.set_helper(False)
139         self._status.set_progress(0.0)
140         self._status.set_active(True)
141         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
142          offsets_tuple) = self.verinfo
143         self._status.set_size(datalength)
144         self._status.set_encoding(k, N)
145         self.readers = {}
146         self._pause_deferred = None
147         self._offset = None
148         self._read_length = None
149         self.log("got seqnum %d" % self.verinfo[0])
150
151
152     def get_status(self):
153         return self._status
154
155     def log(self, *args, **kwargs):
156         if "parent" not in kwargs:
157             kwargs["parent"] = self._log_number
158         if "facility" not in kwargs:
159             kwargs["facility"] = "tahoe.mutable.retrieve"
160         return log.msg(*args, **kwargs)
161
162     def _set_current_status(self, state):
163         seg = "%d/%d" % (self._current_segment, self._last_segment)
164         self._status.set_status("segment %s (%s)" % (seg, state))
165
166     ###################
167     # IPushProducer
168
169     def pauseProducing(self):
170         """
171         I am called by my download target if we have produced too much
172         data for it to handle. I make the downloader stop producing new
173         data until my resumeProducing method is called.
174         """
175         if self._pause_deferred is not None:
176             return
177
178         # fired when the download is unpaused. 
179         self._old_status = self._status.get_status()
180         self._set_current_status("paused")
181
182         self._pause_deferred = defer.Deferred()
183
184
185     def resumeProducing(self):
186         """
187         I am called by my download target once it is ready to begin
188         receiving data again.
189         """
190         if self._pause_deferred is None:
191             return
192
193         p = self._pause_deferred
194         self._pause_deferred = None
195         self._status.set_status(self._old_status)
196
197         eventually(p.callback, None)
198
199
200     def _check_for_paused(self, res):
201         """
202         I am called just before a write to the consumer. I return a
203         Deferred that eventually fires with the data that is to be
204         written to the consumer. If the download has not been paused,
205         the Deferred fires immediately. Otherwise, the Deferred fires
206         when the downloader is unpaused.
207         """
208         if self._pause_deferred is not None:
209             d = defer.Deferred()
210             self._pause_deferred.addCallback(lambda ignored: d.callback(res))
211             return d
212         return defer.succeed(res)
213
214
215     def download(self, consumer=None, offset=0, size=None):
216         assert IConsumer.providedBy(consumer) or self._verify
217
218         if consumer:
219             self._consumer = consumer
220             # we provide IPushProducer, so streaming=True, per
221             # IConsumer.
222             self._consumer.registerProducer(self, streaming=True)
223
224         self._done_deferred = defer.Deferred()
225         self._offset = offset
226         self._read_length = size
227         self._setup_download()
228         self._setup_encoding_parameters()
229         self.log("starting download")
230         self._started_fetching = time.time()
231         # The download process beyond this is a state machine.
232         # _add_active_peers will select the peers that we want to use
233         # for the download, and then attempt to start downloading. After
234         # each segment, it will check for doneness, reacting to broken
235         # peers and corrupt shares as necessary. If it runs out of good
236         # peers before downloading all of the segments, _done_deferred
237         # will errback.  Otherwise, it will eventually callback with the
238         # contents of the mutable file.
239         self.loop()
240         return self._done_deferred
241
242     def loop(self):
243         d = fireEventually(None) # avoid #237 recursion limit problem
244         d.addCallback(lambda ign: self._activate_enough_peers())
245         d.addCallback(lambda ign: self._download_current_segment())
246         # when we're done, _download_current_segment will call _done. If we
247         # aren't, it will call loop() again.
248         d.addErrback(self._error)
249
250     def _setup_download(self):
251         self._started = time.time()
252         self._status.set_status("Retrieving Shares")
253
254         # how many shares do we need?
255         (seqnum,
256          root_hash,
257          IV,
258          segsize,
259          datalength,
260          k,
261          N,
262          prefix,
263          offsets_tuple) = self.verinfo
264
265         # first, which servers can we use?
266         versionmap = self.servermap.make_versionmap()
267         shares = versionmap[self.verinfo]
268         # this sharemap is consumed as we decide to send requests
269         self.remaining_sharemap = DictOfSets()
270         for (shnum, peerid, timestamp) in shares:
271             self.remaining_sharemap.add(shnum, peerid)
272             # If the servermap update fetched anything, it fetched at least 1
273             # KiB, so we ask for that much.
274             # TODO: Change the cache methods to allow us to fetch all of the
275             # data that they have, then change this method to do that.
276             any_cache = self._node._read_from_cache(self.verinfo, shnum,
277                                                     0, 1000)
278             ss = self.servermap.connections[peerid]
279             reader = MDMFSlotReadProxy(ss,
280                                        self._storage_index,
281                                        shnum,
282                                        any_cache)
283             reader.peerid = peerid
284             self.readers[shnum] = reader
285         assert len(self.remaining_sharemap) >= k
286
287         self.shares = {} # maps shnum to validated blocks
288         self._active_readers = [] # list of active readers for this dl.
289         self._block_hash_trees = {} # shnum => hashtree
290
291         # We need one share hash tree for the entire file; its leaves
292         # are the roots of the block hash trees for the shares that
293         # comprise it, and its root is in the verinfo.
294         self.share_hash_tree = hashtree.IncompleteHashTree(N)
295         self.share_hash_tree.set_hashes({0: root_hash})
296
297     def decode(self, blocks_and_salts, segnum):
298         """
299         I am a helper method that the mutable file update process uses
300         as a shortcut to decode and decrypt the segments that it needs
301         to fetch in order to perform a file update. I take in a
302         collection of blocks and salts, and pick some of those to make a
303         segment with. I return the plaintext associated with that
304         segment.
305         """
306         # shnum => block hash tree. Unused, but setup_encoding_parameters will
307         # want to set this.
308         self._block_hash_trees = None
309         self._setup_encoding_parameters()
310
311         # This is the form expected by decode.
312         blocks_and_salts = blocks_and_salts.items()
313         blocks_and_salts = [(True, [d]) for d in blocks_and_salts]
314
315         d = self._decode_blocks(blocks_and_salts, segnum)
316         d.addCallback(self._decrypt_segment)
317         return d
318
319
320     def _setup_encoding_parameters(self):
321         """
322         I set up the encoding parameters, including k, n, the number
323         of segments associated with this file, and the segment decoders.
324         """
325         (seqnum,
326          root_hash,
327          IV,
328          segsize,
329          datalength,
330          k,
331          n,
332          known_prefix,
333          offsets_tuple) = self.verinfo
334         self._required_shares = k
335         self._total_shares = n
336         self._segment_size = segsize
337         self._data_length = datalength
338
339         if not IV:
340             self._version = MDMF_VERSION
341         else:
342             self._version = SDMF_VERSION
343
344         if datalength and segsize:
345             self._num_segments = mathutil.div_ceil(datalength, segsize)
346             self._tail_data_size = datalength % segsize
347         else:
348             self._num_segments = 0
349             self._tail_data_size = 0
350
351         self._segment_decoder = codec.CRSDecoder()
352         self._segment_decoder.set_params(segsize, k, n)
353
354         if  not self._tail_data_size:
355             self._tail_data_size = segsize
356
357         self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
358                                                          self._required_shares)
359         if self._tail_segment_size == self._segment_size:
360             self._tail_decoder = self._segment_decoder
361         else:
362             self._tail_decoder = codec.CRSDecoder()
363             self._tail_decoder.set_params(self._tail_segment_size,
364                                           self._required_shares,
365                                           self._total_shares)
366
367         self.log("got encoding parameters: "
368                  "k: %d "
369                  "n: %d "
370                  "%d segments of %d bytes each (%d byte tail segment)" % \
371                  (k, n, self._num_segments, self._segment_size,
372                   self._tail_segment_size))
373
374         if self._block_hash_trees is not None:
375             for i in xrange(self._total_shares):
376                 # So we don't have to do this later.
377                 self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
378
379         # Our last task is to tell the downloader where to start and
380         # where to stop. We use three parameters for that:
381         #   - self._start_segment: the segment that we need to start
382         #     downloading from. 
383         #   - self._current_segment: the next segment that we need to
384         #     download.
385         #   - self._last_segment: The last segment that we were asked to
386         #     download.
387         #
388         #  We say that the download is complete when
389         #  self._current_segment > self._last_segment. We use
390         #  self._start_segment and self._last_segment to know when to
391         #  strip things off of segments, and how much to strip.
392         if self._offset:
393             self.log("got offset: %d" % self._offset)
394             # our start segment is the first segment containing the
395             # offset we were given. 
396             start = self._offset // self._segment_size
397
398             assert start < self._num_segments
399             self._start_segment = start
400             self.log("got start segment: %d" % self._start_segment)
401         else:
402             self._start_segment = 0
403
404
405         # If self._read_length is None, then we want to read the whole
406         # file. Otherwise, we want to read only part of the file, and
407         # need to figure out where to stop reading.
408         if self._read_length is not None:
409             # our end segment is the last segment containing part of the
410             # segment that we were asked to read.
411             self.log("got read length %d" % self._read_length)
412             if self._read_length != 0:
413                 end_data = self._offset + self._read_length
414
415                 # We don't actually need to read the byte at end_data,
416                 # but the one before it.
417                 end = (end_data - 1) // self._segment_size
418
419                 assert end < self._num_segments
420                 self._last_segment = end
421             else:
422                 self._last_segment = self._start_segment
423             self.log("got end segment: %d" % self._last_segment)
424         else:
425             self._last_segment = self._num_segments - 1
426
427         self._current_segment = self._start_segment
428
429     def _activate_enough_peers(self):
430         """
431         I populate self._active_readers with enough active readers to
432         retrieve the contents of this mutable file. I am called before
433         downloading starts, and (eventually) after each validation
434         error, connection error, or other problem in the download.
435         """
436         # TODO: It would be cool to investigate other heuristics for
437         # reader selection. For instance, the cost (in time the user
438         # spends waiting for their file) of selecting a really slow peer
439         # that happens to have a primary share is probably more than
440         # selecting a really fast peer that doesn't have a primary
441         # share. Maybe the servermap could be extended to provide this
442         # information; it could keep track of latency information while
443         # it gathers more important data, and then this routine could
444         # use that to select active readers.
445         #
446         # (these and other questions would be easier to answer with a
447         #  robust, configurable tahoe-lafs simulator, which modeled node
448         #  failures, differences in node speed, and other characteristics
449         #  that we expect storage servers to have.  You could have
450         #  presets for really stable grids (like allmydata.com),
451         #  friendnets, make it easy to configure your own settings, and
452         #  then simulate the effect of big changes on these use cases
453         #  instead of just reasoning about what the effect might be. Out
454         #  of scope for MDMF, though.)
455
456         # We need at least self._required_shares readers to download a
457         # segment. If we're verifying, we need all shares.
458         if self._verify:
459             needed = self._total_shares
460         else:
461             needed = self._required_shares
462         # XXX: Why don't format= log messages work here?
463         self.log("adding %d peers to the active peers list" % needed)
464
465         if len(self._active_readers) >= needed:
466             # enough shares are active
467             return
468
469         more = needed - len(self._active_readers)
470         known_shnums = set(self.remaining_sharemap.keys())
471         used_shnums = set([r.shnum for r in self._active_readers])
472         unused_shnums = known_shnums - used_shnums
473         # We favor lower numbered shares, since FEC is faster with
474         # primary shares than with other shares, and lower-numbered
475         # shares are more likely to be primary than higher numbered
476         # shares.
477         new_shnums = sorted(unused_shnums)[:more]
478         if len(new_shnums) < more and not self._verify:
479             # We don't have enough readers to retrieve the file; fail.
480             self._raise_notenoughshareserror()
481
482         for shnum in new_shnums:
483             reader = self.readers[shnum]
484             self._active_readers.append(reader)
485             self.log("added reader for share %d" % shnum)
486             # Each time we add a reader, we check to see if we need the
487             # private key. If we do, we politely ask for it and then continue
488             # computing. If we find that we haven't gotten it at the end of
489             # segment decoding, then we'll take more drastic measures.
490             if self._need_privkey and not self._node.is_readonly():
491                 d = reader.get_encprivkey()
492                 d.addCallback(self._try_to_validate_privkey, reader)
493                 # XXX: don't just drop the Deferred. We need error-reporting
494                 # but not flow-control here.
495         assert len(self._active_readers) >= self._required_shares
496
497     def _try_to_validate_prefix(self, prefix, reader):
498         """
499         I check that the prefix returned by a candidate server for
500         retrieval matches the prefix that the servermap knows about
501         (and, hence, the prefix that was validated earlier). If it does,
502         I return True, which means that I approve of the use of the
503         candidate server for segment retrieval. If it doesn't, I return
504         False, which means that another server must be chosen.
505         """
506         (seqnum,
507          root_hash,
508          IV,
509          segsize,
510          datalength,
511          k,
512          N,
513          known_prefix,
514          offsets_tuple) = self.verinfo
515         if known_prefix != prefix:
516             self.log("prefix from share %d doesn't match" % reader.shnum)
517             raise UncoordinatedWriteError("Mismatched prefix -- this could "
518                                           "indicate an uncoordinated write")
519         # Otherwise, we're okay -- no issues.
520
521
522     def _remove_reader(self, reader):
523         """
524         At various points, we will wish to remove a peer from
525         consideration and/or use. These include, but are not necessarily
526         limited to:
527
528             - A connection error.
529             - A mismatched prefix (that is, a prefix that does not match
530               our conception of the version information string).
531             - A failing block hash, salt hash, or share hash, which can
532               indicate disk failure/bit flips, or network trouble.
533
534         This method will do that. I will make sure that the
535         (shnum,reader) combination represented by my reader argument is
536         not used for anything else during this download. I will not
537         advise the reader of any corruption, something that my callers
538         may wish to do on their own.
539         """
540         # TODO: When you're done writing this, see if this is ever
541         # actually used for something that _mark_bad_share isn't. I have
542         # a feeling that they will be used for very similar things, and
543         # that having them both here is just going to be an epic amount
544         # of code duplication.
545         #
546         # (well, okay, not epic, but meaningful)
547         self.log("removing reader %s" % reader)
548         # Remove the reader from _active_readers
549         self._active_readers.remove(reader)
550         # TODO: self.readers.remove(reader)?
551         for shnum in list(self.remaining_sharemap.keys()):
552             self.remaining_sharemap.discard(shnum, reader.peerid)
553
554
555     def _mark_bad_share(self, reader, f):
556         """
557         I mark the (peerid, shnum) encapsulated by my reader argument as
558         a bad share, which means that it will not be used anywhere else.
559
560         There are several reasons to want to mark something as a bad
561         share. These include:
562
563             - A connection error to the peer.
564             - A mismatched prefix (that is, a prefix that does not match
565               our local conception of the version information string).
566             - A failing block hash, salt hash, share hash, or other
567               integrity check.
568
569         This method will ensure that readers that we wish to mark bad
570         (for these reasons or other reasons) are not used for the rest
571         of the download. Additionally, it will attempt to tell the
572         remote peer (with no guarantee of success) that its share is
573         corrupt.
574         """
575         self.log("marking share %d on server %s as bad" % \
576                  (reader.shnum, reader))
577         prefix = self.verinfo[-2]
578         self.servermap.mark_bad_share(reader.peerid,
579                                       reader.shnum,
580                                       prefix)
581         self._remove_reader(reader)
582         self._bad_shares.add((reader.peerid, reader.shnum, f))
583         self._status.problems[reader.peerid] = f
584         self._last_failure = f
585         self.notify_server_corruption(reader.peerid, reader.shnum,
586                                       str(f.value))
587
588
589     def _download_current_segment(self):
590         """
591         I download, validate, decode, decrypt, and assemble the segment
592         that this Retrieve is currently responsible for downloading.
593         """
594         assert len(self._active_readers) >= self._required_shares
595         if self._current_segment > self._last_segment:
596             # No more segments to download, we're done.
597             self.log("got plaintext, done")
598             return self._done()
599         self.log("on segment %d of %d" %
600                  (self._current_segment + 1, self._num_segments))
601         d = self._process_segment(self._current_segment)
602         d.addCallback(lambda ign: self.loop())
603         return d
604
605     def _process_segment(self, segnum):
606         """
607         I download, validate, decode, and decrypt one segment of the
608         file that this Retrieve is retrieving. This means coordinating
609         the process of getting k blocks of that file, validating them,
610         assembling them into one segment with the decoder, and then
611         decrypting them.
612         """
613         self.log("processing segment %d" % segnum)
614
615         # TODO: The old code uses a marker. Should this code do that
616         # too? What did the Marker do?
617         assert len(self._active_readers) >= self._required_shares
618
619         # We need to ask each of our active readers for its block and
620         # salt. We will then validate those. If validation is
621         # successful, we will assemble the results into plaintext.
622         ds = []
623         for reader in self._active_readers:
624             started = time.time()
625             d = reader.get_block_and_salt(segnum)
626             d2 = self._get_needed_hashes(reader, segnum)
627             dl = defer.DeferredList([d, d2], consumeErrors=True)
628             dl.addCallback(self._validate_block, segnum, reader, started)
629             dl.addErrback(self._validation_or_decoding_failed, [reader])
630             ds.append(dl)
631         dl = defer.DeferredList(ds)
632         if self._verify:
633             dl.addCallback(lambda ignored: "")
634             dl.addCallback(self._set_segment)
635         else:
636             dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
637         return dl
638
639
640     def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum):
641         """
642         I take the results of fetching and validating the blocks from a
643         callback chain in another method. If the results are such that
644         they tell me that validation and fetching succeeded without
645         incident, I will proceed with decoding and decryption.
646         Otherwise, I will do nothing.
647         """
648         self.log("trying to decode and decrypt segment %d" % segnum)
649         failures = False
650         for block_and_salt in blocks_and_salts:
651             if not block_and_salt[0] or block_and_salt[1] == None:
652                 self.log("some validation operations failed; not proceeding")
653                 failures = True
654                 break
655         if not failures:
656             self.log("everything looks ok, building segment %d" % segnum)
657             d = self._decode_blocks(blocks_and_salts, segnum)
658             d.addCallback(self._decrypt_segment)
659             d.addErrback(self._validation_or_decoding_failed,
660                          self._active_readers)
661             # check to see whether we've been paused before writing
662             # anything.
663             d.addCallback(self._check_for_paused)
664             d.addCallback(self._set_segment)
665             return d
666         else:
667             return defer.succeed(None)
668
669
670     def _set_segment(self, segment):
671         """
672         Given a plaintext segment, I register that segment with the
673         target that is handling the file download.
674         """
675         self.log("got plaintext for segment %d" % self._current_segment)
676         if self._current_segment == self._start_segment:
677             # We're on the first segment. It's possible that we want
678             # only some part of the end of this segment, and that we
679             # just downloaded the whole thing to get that part. If so,
680             # we need to account for that and give the reader just the
681             # data that they want.
682             n = self._offset % self._segment_size
683             self.log("stripping %d bytes off of the first segment" % n)
684             self.log("original segment length: %d" % len(segment))
685             segment = segment[n:]
686             self.log("new segment length: %d" % len(segment))
687
688         if self._current_segment == self._last_segment and self._read_length is not None:
689             # We're on the last segment. It's possible that we only want
690             # part of the beginning of this segment, and that we
691             # downloaded the whole thing anyway. Make sure to give the
692             # caller only the portion of the segment that they want to
693             # receive.
694             extra = self._read_length
695             if self._start_segment != self._last_segment:
696                 extra -= self._segment_size - \
697                             (self._offset % self._segment_size)
698             extra %= self._segment_size
699             self.log("original segment length: %d" % len(segment))
700             segment = segment[:extra]
701             self.log("new segment length: %d" % len(segment))
702             self.log("only taking %d bytes of the last segment" % extra)
703
704         if not self._verify:
705             self._consumer.write(segment)
706         else:
707             # we don't care about the plaintext if we are doing a verify.
708             segment = None
709         self._current_segment += 1
710
711
712     def _validation_or_decoding_failed(self, f, readers):
713         """
714         I am called when a block or a salt fails to correctly validate, or when
715         the decryption or decoding operation fails for some reason.  I react to
716         this failure by notifying the remote server of corruption, and then
717         removing the remote peer from further activity.
718         """
719         assert isinstance(readers, list)
720         bad_shnums = [reader.shnum for reader in readers]
721
722         self.log("validation or decoding failed on share(s) %s, peer(s) %s "
723                  ", segment %d: %s" % \
724                  (bad_shnums, readers, self._current_segment, str(f)))
725         for reader in readers:
726             self._mark_bad_share(reader, f)
727         return
728
729
730     def _validate_block(self, results, segnum, reader, started):
731         """
732         I validate a block from one share on a remote server.
733         """
734         # Grab the part of the block hash tree that is necessary to
735         # validate this block, then generate the block hash root.
736         self.log("validating share %d for segment %d" % (reader.shnum,
737                                                              segnum))
738         elapsed = time.time() - started
739         self._status.add_fetch_timing(reader.peerid, elapsed)
740         self._set_current_status("validating blocks")
741         # Did we fail to fetch either of the things that we were
742         # supposed to? Fail if so.
743         if not results[0][0] and results[1][0]:
744             # handled by the errback handler.
745
746             # These all get batched into one query, so the resulting
747             # failure should be the same for all of them, so we can just
748             # use the first one.
749             assert isinstance(results[0][1], failure.Failure)
750
751             f = results[0][1]
752             raise CorruptShareError(reader.peerid,
753                                     reader.shnum,
754                                     "Connection error: %s" % str(f))
755
756         block_and_salt, block_and_sharehashes = results
757         block, salt = block_and_salt[1]
758         blockhashes, sharehashes = block_and_sharehashes[1]
759
760         blockhashes = dict(enumerate(blockhashes[1]))
761         self.log("the reader gave me the following blockhashes: %s" % \
762                  blockhashes.keys())
763         self.log("the reader gave me the following sharehashes: %s" % \
764                  sharehashes[1].keys())
765         bht = self._block_hash_trees[reader.shnum]
766
767         if bht.needed_hashes(segnum, include_leaf=True):
768             try:
769                 bht.set_hashes(blockhashes)
770             except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
771                     IndexError), e:
772                 raise CorruptShareError(reader.peerid,
773                                         reader.shnum,
774                                         "block hash tree failure: %s" % e)
775
776         if self._version == MDMF_VERSION:
777             blockhash = hashutil.block_hash(salt + block)
778         else:
779             blockhash = hashutil.block_hash(block)
780         # If this works without an error, then validation is
781         # successful.
782         try:
783            bht.set_hashes(leaves={segnum: blockhash})
784         except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
785                 IndexError), e:
786             raise CorruptShareError(reader.peerid,
787                                     reader.shnum,
788                                     "block hash tree failure: %s" % e)
789
790         # Reaching this point means that we know that this segment
791         # is correct. Now we need to check to see whether the share
792         # hash chain is also correct. 
793         # SDMF wrote share hash chains that didn't contain the
794         # leaves, which would be produced from the block hash tree.
795         # So we need to validate the block hash tree first. If
796         # successful, then bht[0] will contain the root for the
797         # shnum, which will be a leaf in the share hash tree, which
798         # will allow us to validate the rest of the tree.
799         if self.share_hash_tree.needed_hashes(reader.shnum,
800                                               include_leaf=True) or \
801                                               self._verify:
802             try:
803                 self.share_hash_tree.set_hashes(hashes=sharehashes[1],
804                                             leaves={reader.shnum: bht[0]})
805             except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
806                     IndexError), e:
807                 raise CorruptShareError(reader.peerid,
808                                         reader.shnum,
809                                         "corrupt hashes: %s" % e)
810
811         self.log('share %d is valid for segment %d' % (reader.shnum,
812                                                        segnum))
813         return {reader.shnum: (block, salt)}
814
815
816     def _get_needed_hashes(self, reader, segnum):
817         """
818         I get the hashes needed to validate segnum from the reader, then return
819         to my caller when this is done.
820         """
821         bht = self._block_hash_trees[reader.shnum]
822         needed = bht.needed_hashes(segnum, include_leaf=True)
823         # The root of the block hash tree is also a leaf in the share
824         # hash tree. So we don't need to fetch it from the remote
825         # server. In the case of files with one segment, this means that
826         # we won't fetch any block hash tree from the remote server,
827         # since the hash of each share of the file is the entire block
828         # hash tree, and is a leaf in the share hash tree. This is fine,
829         # since any share corruption will be detected in the share hash
830         # tree.
831         #needed.discard(0)
832         self.log("getting blockhashes for segment %d, share %d: %s" % \
833                  (segnum, reader.shnum, str(needed)))
834         d1 = reader.get_blockhashes(needed, force_remote=True)
835         if self.share_hash_tree.needed_hashes(reader.shnum):
836             need = self.share_hash_tree.needed_hashes(reader.shnum)
837             self.log("also need sharehashes for share %d: %s" % (reader.shnum,
838                                                                  str(need)))
839             d2 = reader.get_sharehashes(need, force_remote=True)
840         else:
841             d2 = defer.succeed({}) # the logic in the next method
842                                    # expects a dict
843         dl = defer.DeferredList([d1, d2], consumeErrors=True)
844         return dl
845
846
847     def _decode_blocks(self, blocks_and_salts, segnum):
848         """
849         I take a list of k blocks and salts, and decode that into a
850         single encrypted segment.
851         """
852         d = {}
853         # We want to merge our dictionaries to the form 
854         # {shnum: blocks_and_salts}
855         #
856         # The dictionaries come from validate block that way, so we just
857         # need to merge them.
858         for block_and_salt in blocks_and_salts:
859             d.update(block_and_salt[1])
860
861         # All of these blocks should have the same salt; in SDMF, it is
862         # the file-wide IV, while in MDMF it is the per-segment salt. In
863         # either case, we just need to get one of them and use it.
864         #
865         # d.items()[0] is like (shnum, (block, salt))
866         # d.items()[0][1] is like (block, salt)
867         # d.items()[0][1][1] is the salt.
868         salt = d.items()[0][1][1]
869         # Next, extract just the blocks from the dict. We'll use the
870         # salt in the next step.
871         share_and_shareids = [(k, v[0]) for k, v in d.items()]
872         d2 = dict(share_and_shareids)
873         shareids = []
874         shares = []
875         for shareid, share in d2.items():
876             shareids.append(shareid)
877             shares.append(share)
878
879         self._set_current_status("decoding")
880         started = time.time()
881         assert len(shareids) >= self._required_shares, len(shareids)
882         # zfec really doesn't want extra shares
883         shareids = shareids[:self._required_shares]
884         shares = shares[:self._required_shares]
885         self.log("decoding segment %d" % segnum)
886         if segnum == self._num_segments - 1:
887             d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
888         else:
889             d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
890         def _process(buffers):
891             segment = "".join(buffers)
892             self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
893                      segnum=segnum,
894                      numsegs=self._num_segments,
895                      level=log.NOISY)
896             self.log(" joined length %d, datalength %d" %
897                      (len(segment), self._data_length))
898             if segnum == self._num_segments - 1:
899                 size_to_use = self._tail_data_size
900             else:
901                 size_to_use = self._segment_size
902             segment = segment[:size_to_use]
903             self.log(" segment len=%d" % len(segment))
904             self._status.accumulate_decode_time(time.time() - started)
905             return segment, salt
906         d.addCallback(_process)
907         return d
908
909
910     def _decrypt_segment(self, segment_and_salt):
911         """
912         I take a single segment and its salt, and decrypt it. I return
913         the plaintext of the segment that is in my argument.
914         """
915         segment, salt = segment_and_salt
916         self._set_current_status("decrypting")
917         self.log("decrypting segment %d" % self._current_segment)
918         started = time.time()
919         key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
920         decryptor = AES(key)
921         plaintext = decryptor.process(segment)
922         self._status.accumulate_decrypt_time(time.time() - started)
923         return plaintext
924
925
926     def notify_server_corruption(self, peerid, shnum, reason):
927         ss = self.servermap.connections[peerid]
928         ss.callRemoteOnly("advise_corrupt_share",
929                           "mutable", self._storage_index, shnum, reason)
930
931
932     def _try_to_validate_privkey(self, enc_privkey, reader):
933         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
934         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
935         if alleged_writekey != self._node.get_writekey():
936             self.log("invalid privkey from %s shnum %d" %
937                      (reader, reader.shnum),
938                      level=log.WEIRD, umid="YIw4tA")
939             if self._verify:
940                 self.servermap.mark_bad_share(reader.peerid, reader.shnum,
941                                               self.verinfo[-2])
942                 e = CorruptShareError(reader.peerid,
943                                       reader.shnum,
944                                       "invalid privkey")
945                 f = failure.Failure(e)
946                 self._bad_shares.add((reader.peerid, reader.shnum, f))
947             return
948
949         # it's good
950         self.log("got valid privkey from shnum %d on reader %s" %
951                  (reader.shnum, reader))
952         privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
953         self._node._populate_encprivkey(enc_privkey)
954         self._node._populate_privkey(privkey)
955         self._need_privkey = False
956
957
958
959     def _done(self):
960         """
961         I am called by _download_current_segment when the download process
962         has finished successfully. After making some useful logging
963         statements, I return the decrypted contents to the owner of this
964         Retrieve object through self._done_deferred.
965         """
966         self._running = False
967         self._status.set_active(False)
968         now = time.time()
969         self._status.timings['total'] = now - self._started
970         self._status.timings['fetch'] = now - self._started_fetching
971         self._status.set_status("Finished")
972         self._status.set_progress(1.0)
973
974         # remember the encoding parameters, use them again next time
975         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
976          offsets_tuple) = self.verinfo
977         self._node._populate_required_shares(k)
978         self._node._populate_total_shares(N)
979
980         if self._verify:
981             ret = list(self._bad_shares)
982             self.log("done verifying, found %d bad shares" % len(ret))
983         else:
984             # TODO: upload status here?
985             ret = self._consumer
986             self._consumer.unregisterProducer()
987         eventually(self._done_deferred.callback, ret)
988
989
990     def _raise_notenoughshareserror(self):
991         """
992         I am called by _activate_enough_peers when there are not enough
993         active peers left to complete the download. After making some
994         useful logging statements, I throw an exception to that effect
995         to the caller of this Retrieve object through
996         self._done_deferred.
997         """
998
999         format = ("ran out of peers: "
1000                   "have %(have)d of %(total)d segments "
1001                   "found %(bad)d bad shares "
1002                   "encoding %(k)d-of-%(n)d")
1003         args = {"have": self._current_segment,
1004                 "total": self._num_segments,
1005                 "need": self._last_segment,
1006                 "k": self._required_shares,
1007                 "n": self._total_shares,
1008                 "bad": len(self._bad_shares)}
1009         raise NotEnoughSharesError("%s, last failure: %s" %
1010                                    (format % args, str(self._last_failure)))
1011
1012     def _error(self, f):
1013         # all errors, including NotEnoughSharesError, land here
1014         self._running = False
1015         self._status.set_active(False)
1016         now = time.time()
1017         self._status.timings['total'] = now - self._started
1018         self._status.timings['fetch'] = now - self._started_fetching
1019         self._status.set_status("Failed")
1020         eventually(self._done_deferred.errback, f)