]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/mutable/retrieve.py
retrieve.py: remove vestigal self._validated_readers
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / retrieve.py
index b1ec761efa0f7165c224c15dbe510564417b4aa8..100350a0b62b00925bfae12dddd70b56649665e6 100644 (file)
@@ -24,6 +24,8 @@ class RetrieveStatus:
     def __init__(self):
         self.timings = {}
         self.timings["fetch_per_server"] = {}
+        self.timings["decode"] = 0.0
+        self.timings["decrypt"] = 0.0
         self.timings["cumulative_verify"] = 0.0
         self.problems = {}
         self.active = True
@@ -59,6 +61,10 @@ class RetrieveStatus:
         if peerid not in self.timings["fetch_per_server"]:
             self.timings["fetch_per_server"][peerid] = []
         self.timings["fetch_per_server"][peerid].append(elapsed)
+    def accumulate_decode_time(self, elapsed):
+        self.timings["decode"] += elapsed
+    def accumulate_decrypt_time(self, elapsed):
+        self.timings["decrypt"] += elapsed
     def set_storage_index(self, si):
         self.storage_index = si
     def set_helper(self, helper):
@@ -153,6 +159,9 @@ class Retrieve:
             kwargs["facility"] = "tahoe.mutable.retrieve"
         return log.msg(*args, **kwargs)
 
+    def _set_current_status(self, state):
+        seg = "%d/%d" % (self._current_segment, self._last_segment)
+        self._status.set_status("segment %s (%s)" % (seg, state))
 
     ###################
     # IPushProducer
@@ -168,7 +177,7 @@ class Retrieve:
 
         # fired when the download is unpaused. 
         self._old_status = self._status.get_status()
-        self._status.set_status("Paused")
+        self._set_current_status("paused")
 
         self._pause_deferred = defer.Deferred()
 
@@ -213,11 +222,45 @@ class Retrieve:
             self._consumer.registerProducer(self, streaming=True)
 
         self._done_deferred = defer.Deferred()
+        self._offset = offset
+        self._read_length = size
+        self._setup_download()
+        self._setup_encoding_parameters()
+        self.log("starting download")
+        self._started_fetching = time.time()
+        # The download process beyond this is a state machine.
+        # _add_active_peers will select the peers that we want to use
+        # for the download, and then attempt to start downloading. After
+        # each segment, it will check for doneness, reacting to broken
+        # peers and corrupt shares as necessary. If it runs out of good
+        # peers before downloading all of the segments, _done_deferred
+        # will errback.  Otherwise, it will eventually callback with the
+        # contents of the mutable file.
+        self.loop()
+        return self._done_deferred
+
+    def loop(self):
+        d = fireEventually(None) # avoid #237 recursion limit problem
+        d.addCallback(lambda ign: self._activate_enough_peers())
+        d.addCallback(lambda ign: self._download_current_segment())
+        # when we're done, _download_current_segment will call _done. If we
+        # aren't, it will call loop() again.
+        d.addErrback(self._error)
+
+    def _setup_download(self):
         self._started = time.time()
         self._status.set_status("Retrieving Shares")
 
-        self._offset = offset
-        self._read_length = size
+        # how many shares do we need?
+        (seqnum,
+         root_hash,
+         IV,
+         segsize,
+         datalength,
+         k,
+         N,
+         prefix,
+         offsets_tuple) = self.verinfo
 
         # first, which servers can we use?
         versionmap = self.servermap.make_versionmap()
@@ -239,54 +282,18 @@ class Retrieve:
                                        any_cache)
             reader.peerid = peerid
             self.readers[shnum] = reader
-
+        assert len(self.remaining_sharemap) >= k
 
         self.shares = {} # maps shnum to validated blocks
         self._active_readers = [] # list of active readers for this dl.
-        self._validated_readers = set() # set of readers that we have
-                                        # validated the prefix of
         self._block_hash_trees = {} # shnum => hashtree
 
-        # how many shares do we need?
-        (seqnum,
-         root_hash,
-         IV,
-         segsize,
-         datalength,
-         k,
-         N,
-         prefix,
-         offsets_tuple) = self.verinfo
-
-
         # We need one share hash tree for the entire file; its leaves
         # are the roots of the block hash trees for the shares that
         # comprise it, and its root is in the verinfo.
         self.share_hash_tree = hashtree.IncompleteHashTree(N)
         self.share_hash_tree.set_hashes({0: root_hash})
 
-        # This will set up both the segment decoder and the tail segment
-        # decoder, as well as a variety of other instance variables that
-        # the download process will use.
-        self._setup_encoding_parameters()
-        assert len(self.remaining_sharemap) >= k
-
-        self.log("starting download")
-        self._started_fetching = time.time()
-
-        self._add_active_peers()
-
-        # The download process beyond this is a state machine.
-        # _add_active_peers will select the peers that we want to use
-        # for the download, and then attempt to start downloading. After
-        # each segment, it will check for doneness, reacting to broken
-        # peers and corrupt shares as necessary. If it runs out of good
-        # peers before downloading all of the segments, _done_deferred
-        # will errback.  Otherwise, it will eventually callback with the
-        # contents of the mutable file.
-        return self._done_deferred
-
-
     def decode(self, blocks_and_salts, segnum):
         """
         I am a helper method that the mutable file update process uses
@@ -313,7 +320,7 @@ class Retrieve:
     def _setup_encoding_parameters(self):
         """
         I set up the encoding parameters, including k, n, the number
-        of segments associated with this file, and the segment decoder.
+        of segments associated with this file, and the segment decoders.
         """
         (seqnum,
          root_hash,
@@ -395,25 +402,31 @@ class Retrieve:
             self._start_segment = 0
 
 
-        if self._read_length:
+        # If self._read_length is None, then we want to read the whole
+        # file. Otherwise, we want to read only part of the file, and
+        # need to figure out where to stop reading.
+        if self._read_length is not None:
             # our end segment is the last segment containing part of the
             # segment that we were asked to read.
             self.log("got read length %d" % self._read_length)
-            end_data = self._offset + self._read_length
+            if self._read_length != 0:
+                end_data = self._offset + self._read_length
 
-            # We don't actually need to read the byte at end_data, but
-            # the one before it.
-            end = (end_data - 1) // self._segment_size
+                # We don't actually need to read the byte at end_data,
+                # but the one before it.
+                end = (end_data - 1) // self._segment_size
 
-            assert end < self._num_segments
-            self._last_segment = end
+                assert end < self._num_segments
+                self._last_segment = end
+            else:
+                self._last_segment = self._start_segment
             self.log("got end segment: %d" % self._last_segment)
         else:
             self._last_segment = self._num_segments - 1
 
         self._current_segment = self._start_segment
 
-    def _add_active_peers(self):
+    def _activate_enough_peers(self):
         """
         I populate self._active_readers with enough active readers to
         retrieve the contents of this mutable file. I am called before
@@ -441,121 +454,45 @@ class Retrieve:
         #  of scope for MDMF, though.)
 
         # We need at least self._required_shares readers to download a
-        # segment.
+        # segment. If we're verifying, we need all shares.
         if self._verify:
             needed = self._total_shares
         else:
-            needed = self._required_shares - len(self._active_readers)
+            needed = self._required_shares
         # XXX: Why don't format= log messages work here?
         self.log("adding %d peers to the active peers list" % needed)
 
+        if len(self._active_readers) >= needed:
+            # enough shares are active
+            return
+
+        more = needed - len(self._active_readers)
+        known_shnums = set(self.remaining_sharemap.keys())
+        used_shnums = set([r.shnum for r in self._active_readers])
+        unused_shnums = known_shnums - used_shnums
         # We favor lower numbered shares, since FEC is faster with
         # primary shares than with other shares, and lower-numbered
         # shares are more likely to be primary than higher numbered
         # shares.
-        active_shnums = set(sorted(self.remaining_sharemap.keys()))
-        # We shouldn't consider adding shares that we already have; this
-        # will cause problems later.
-        active_shnums -= set([reader.shnum for reader in self._active_readers])
-        active_shnums = list(active_shnums)[:needed]
-        if len(active_shnums) < needed and not self._verify:
+        new_shnums = sorted(unused_shnums)[:more]
+        if len(new_shnums) < more and not self._verify:
             # We don't have enough readers to retrieve the file; fail.
-            return self._failed()
+            self._raise_notenoughshareserror()
 
-        for shnum in active_shnums:
-            self._active_readers.append(self.readers[shnum])
+        for shnum in new_shnums:
+            reader = self.readers[shnum]
+            self._active_readers.append(reader)
             self.log("added reader for share %d" % shnum)
+            # Each time we add a reader, we check to see if we need the
+            # private key. If we do, we politely ask for it and then continue
+            # computing. If we find that we haven't gotten it at the end of
+            # segment decoding, then we'll take more drastic measures.
+            if self._need_privkey and not self._node.is_readonly():
+                d = reader.get_encprivkey()
+                d.addCallback(self._try_to_validate_privkey, reader)
+                # XXX: don't just drop the Deferred. We need error-reporting
+                # but not flow-control here.
         assert len(self._active_readers) >= self._required_shares
-        # Conceptually, this is part of the _add_active_peers step. It
-        # validates the prefixes of newly added readers to make sure
-        # that they match what we are expecting for self.verinfo. If
-        # validation is successful, _validate_active_prefixes will call
-        # _download_current_segment for us. If validation is
-        # unsuccessful, then _validate_prefixes will remove the peer and
-        # call _add_active_peers again, where we will attempt to rectify
-        # the problem by choosing another peer.
-        return self._validate_active_prefixes()
-
-
-    def _validate_active_prefixes(self):
-        """
-        I check to make sure that the prefixes on the peers that I am
-        currently reading from match the prefix that we want to see, as
-        said in self.verinfo.
-
-        If I find that all of the active peers have acceptable prefixes,
-        I pass control to _download_current_segment, which will use
-        those peers to do cool things. If I find that some of the active
-        peers have unacceptable prefixes, I will remove them from active
-        peers (and from further consideration) and call
-        _add_active_peers to attempt to rectify the situation. I keep
-        track of which peers I have already validated so that I don't
-        need to do so again.
-        """
-        assert self._active_readers, "No more active readers"
-
-        ds = []
-        new_readers = set(self._active_readers) - self._validated_readers
-        self.log('validating %d newly-added active readers' % len(new_readers))
-
-        for reader in new_readers:
-            # We force a remote read here -- otherwise, we are relying
-            # on cached data that we already verified as valid, and we
-            # won't detect an uncoordinated write that has occurred
-            # since the last servermap update.
-            d = reader.get_prefix(force_remote=True)
-            d.addCallback(self._try_to_validate_prefix, reader)
-            ds.append(d)
-        dl = defer.DeferredList(ds, consumeErrors=True)
-        def _check_results(results):
-            # Each result in results will be of the form (success, msg).
-            # We don't care about msg, but success will tell us whether
-            # or not the checkstring validated. If it didn't, we need to
-            # remove the offending (peer,share) from our active readers,
-            # and ensure that active readers is again populated.
-            bad_readers = []
-            for i, result in enumerate(results):
-                if not result[0]:
-                    reader = self._active_readers[i]
-                    f = result[1]
-                    assert isinstance(f, failure.Failure)
-
-                    self.log("The reader %s failed to "
-                             "properly validate: %s" % \
-                             (reader, str(f.value)))
-                    bad_readers.append((reader, f))
-                else:
-                    reader = self._active_readers[i]
-                    self.log("the reader %s checks out, so we'll use it" % \
-                             reader)
-                    self._validated_readers.add(reader)
-                    # Each time we validate a reader, we check to see if
-                    # we need the private key. If we do, we politely ask
-                    # for it and then continue computing. If we find
-                    # that we haven't gotten it at the end of
-                    # segment decoding, then we'll take more drastic
-                    # measures.
-                    if self._need_privkey and not self._node.is_readonly():
-                        d = reader.get_encprivkey()
-                        d.addCallback(self._try_to_validate_privkey, reader)
-            if bad_readers:
-                # We do them all at once, or else we screw up list indexing.
-                for (reader, f) in bad_readers:
-                    self._mark_bad_share(reader, f)
-                if self._verify:
-                    if len(self._active_readers) >= self._required_shares:
-                        return self._download_current_segment()
-                    else:
-                        return self._failed()
-                else:
-                    return self._add_active_peers()
-            else:
-                return self._download_current_segment()
-            # The next step will assert that it has enough active
-            # readers to fetch shares; we just need to remove it.
-        dl.addCallback(_check_results)
-        return dl
-
 
     def _try_to_validate_prefix(self, prefix, reader):
         """
@@ -655,23 +592,16 @@ class Retrieve:
         that this Retrieve is currently responsible for downloading.
         """
         assert len(self._active_readers) >= self._required_shares
-        if self._current_segment <= self._last_segment:
-            d = self._process_segment(self._current_segment)
-        else:
-            d = defer.succeed(None)
-        d.addBoth(self._turn_barrier)
-        d.addCallback(self._check_for_done)
+        if self._current_segment > self._last_segment:
+            # No more segments to download, we're done.
+            self.log("got plaintext, done")
+            return self._done()
+        self.log("on segment %d of %d" %
+                 (self._current_segment + 1, self._num_segments))
+        d = self._process_segment(self._current_segment)
+        d.addCallback(lambda ign: self.loop())
         return d
 
-
-    def _turn_barrier(self, result):
-        """
-        I help the download process avoid the recursion limit issues
-        discussed in #237.
-        """
-        return fireEventually(result)
-
-
     def _process_segment(self, segnum):
         """
         I download, validate, decode, and decrypt one segment of the
@@ -692,13 +622,12 @@ class Retrieve:
         ds = []
         for reader in self._active_readers:
             started = time.time()
-            d = reader.get_block_and_salt(segnum, queue=True)
+            d = reader.get_block_and_salt(segnum)
             d2 = self._get_needed_hashes(reader, segnum)
             dl = defer.DeferredList([d, d2], consumeErrors=True)
             dl.addCallback(self._validate_block, segnum, reader, started)
             dl.addErrback(self._validation_or_decoding_failed, [reader])
             ds.append(dl)
-            reader.flush()
         dl = defer.DeferredList(ds)
         if self._verify:
             dl.addCallback(lambda ignored: "")
@@ -806,8 +735,9 @@ class Retrieve:
         # validate this block, then generate the block hash root.
         self.log("validating share %d for segment %d" % (reader.shnum,
                                                              segnum))
-        self._status.add_fetch_timing(reader.peerid, started)
-        self._status.set_status("Valdiating blocks for segment %d" % segnum)
+        elapsed = time.time() - started
+        self._status.add_fetch_timing(reader.peerid, elapsed)
+        self._set_current_status("validating blocks")
         # Did we fail to fetch either of the things that we were
         # supposed to? Fail if so.
         if not results[0][0] and results[1][0]:
@@ -901,12 +831,12 @@ class Retrieve:
         #needed.discard(0)
         self.log("getting blockhashes for segment %d, share %d: %s" % \
                  (segnum, reader.shnum, str(needed)))
-        d1 = reader.get_blockhashes(needed, queue=True, force_remote=True)
+        d1 = reader.get_blockhashes(needed, force_remote=True)
         if self.share_hash_tree.needed_hashes(reader.shnum):
             need = self.share_hash_tree.needed_hashes(reader.shnum)
             self.log("also need sharehashes for share %d: %s" % (reader.shnum,
                                                                  str(need)))
-            d2 = reader.get_sharehashes(need, queue=True, force_remote=True)
+            d2 = reader.get_sharehashes(need, force_remote=True)
         else:
             d2 = defer.succeed({}) # the logic in the next method
                                    # expects a dict
@@ -946,7 +876,7 @@ class Retrieve:
             shareids.append(shareid)
             shares.append(share)
 
-        self._status.set_status("Decoding")
+        self._set_current_status("decoding")
         started = time.time()
         assert len(shareids) >= self._required_shares, len(shareids)
         # zfec really doesn't want extra shares
@@ -971,8 +901,7 @@ class Retrieve:
                 size_to_use = self._segment_size
             segment = segment[:size_to_use]
             self.log(" segment len=%d" % len(segment))
-            self._status.timings.setdefault("decode", 0)
-            self._status.timings['decode'] = time.time() - started
+            self._status.accumulate_decode_time(time.time() - started)
             return segment, salt
         d.addCallback(_process)
         return d
@@ -984,14 +913,13 @@ class Retrieve:
         the plaintext of the segment that is in my argument.
         """
         segment, salt = segment_and_salt
-        self._status.set_status("decrypting")
+        self._set_current_status("decrypting")
         self.log("decrypting segment %d" % self._current_segment)
         started = time.time()
         key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
         decryptor = AES(key)
         plaintext = decryptor.process(segment)
-        self._status.timings.setdefault("decrypt", 0)
-        self._status.timings['decrypt'] = time.time() - started
+        self._status.accumulate_decrypt_time(time.time() - started)
         return plaintext
 
 
@@ -1027,50 +955,11 @@ class Retrieve:
         self._need_privkey = False
 
 
-    def _check_for_done(self, res):
-        """
-        I check to see if this Retrieve object has successfully finished
-        its work.
-
-        I can exit in the following ways:
-            - If there are no more segments to download, then I exit by
-              causing self._done_deferred to fire with the plaintext
-              content requested by the caller.
-            - If there are still segments to be downloaded, and there
-              are enough active readers (readers which have not broken
-              and have not given us corrupt data) to continue
-              downloading, I send control back to
-              _download_current_segment.
-            - If there are still segments to be downloaded but there are
-              not enough active peers to download them, I ask
-              _add_active_peers to add more peers. If it is successful,
-              it will call _download_current_segment. If there are not
-              enough peers to retrieve the file, then that will cause
-              _done_deferred to errback.
-        """
-        self.log("checking for doneness")
-        if self._current_segment > self._last_segment:
-            # No more segments to download, we're done.
-            self.log("got plaintext, done")
-            return self._done()
-
-        if len(self._active_readers) >= self._required_shares:
-            # More segments to download, but we have enough good peers
-            # in self._active_readers that we can do that without issue,
-            # so go nab the next segment.
-            self.log("not done yet: on segment %d of %d" % \
-                     (self._current_segment + 1, self._num_segments))
-            return self._download_current_segment()
-
-        self.log("not done yet: on segment %d of %d, need to add peers" % \
-                 (self._current_segment + 1, self._num_segments))
-        return self._add_active_peers()
-
 
     def _done(self):
         """
-        I am called by _check_for_done when the download process has
-        finished successfully. After making some useful logging
+        I am called by _download_current_segment when the download process
+        has finished successfully. After making some useful logging
         statements, I return the decrypted contents to the owner of this
         Retrieve object through self._done_deferred.
         """
@@ -1079,6 +968,8 @@ class Retrieve:
         now = time.time()
         self._status.timings['total'] = now - self._started
         self._status.timings['fetch'] = now - self._started_fetching
+        self._status.set_status("Finished")
+        self._status.set_progress(1.0)
 
         # remember the encoding parameters, use them again next time
         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
@@ -1096,35 +987,34 @@ class Retrieve:
         eventually(self._done_deferred.callback, ret)
 
 
-    def _failed(self):
+    def _raise_notenoughshareserror(self):
         """
-        I am called by _add_active_peers when there are not enough
+        I am called by _activate_enough_peers when there are not enough
         active peers left to complete the download. After making some
-        useful logging statements, I return an exception to that effect
+        useful logging statements, I throw an exception to that effect
         to the caller of this Retrieve object through
         self._done_deferred.
         """
+
+        format = ("ran out of peers: "
+                  "have %(have)d of %(total)d segments "
+                  "found %(bad)d bad shares "
+                  "encoding %(k)d-of-%(n)d")
+        args = {"have": self._current_segment,
+                "total": self._num_segments,
+                "need": self._last_segment,
+                "k": self._required_shares,
+                "n": self._total_shares,
+                "bad": len(self._bad_shares)}
+        raise NotEnoughSharesError("%s, last failure: %s" %
+                                   (format % args, str(self._last_failure)))
+
+    def _error(self, f):
+        # all errors, including NotEnoughSharesError, land here
         self._running = False
         self._status.set_active(False)
         now = time.time()
         self._status.timings['total'] = now - self._started
         self._status.timings['fetch'] = now - self._started_fetching
-
-        if self._verify:
-            ret = list(self._bad_shares)
-        else:
-            format = ("ran out of peers: "
-                      "have %(have)d of %(total)d segments "
-                      "found %(bad)d bad shares "
-                      "encoding %(k)d-of-%(n)d")
-            args = {"have": self._current_segment,
-                    "total": self._num_segments,
-                    "need": self._last_segment,
-                    "k": self._required_shares,
-                    "n": self._total_shares,
-                    "bad": len(self._bad_shares)}
-            e = NotEnoughSharesError("%s, last failure: %s" % \
-                                     (format % args, str(self._last_failure)))
-            f = failure.Failure(e)
-            ret = f
-        eventually(self._done_deferred.callback, ret)
+        self._status.set_status("Failed")
+        eventually(self._done_deferred.errback, f)