]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/mutable/retrieve.py
Fix mutable publish/retrieve timing status displays. Fixes #1505.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / retrieve.py
index 166eec48807546e7b6c32e0d99e5b3b317425a72..7498a134dd014ddb243e06236905d4668382c4b4 100644 (file)
@@ -24,6 +24,8 @@ class RetrieveStatus:
     def __init__(self):
         self.timings = {}
         self.timings["fetch_per_server"] = {}
+        self.timings["decode"] = 0.0
+        self.timings["decrypt"] = 0.0
         self.timings["cumulative_verify"] = 0.0
         self.problems = {}
         self.active = True
@@ -59,6 +61,10 @@ class RetrieveStatus:
         if peerid not in self.timings["fetch_per_server"]:
             self.timings["fetch_per_server"][peerid] = []
         self.timings["fetch_per_server"][peerid].append(elapsed)
+    def accumulate_decode_time(self, elapsed):
+        self.timings["decode"] += elapsed
+    def accumulate_decrypt_time(self, elapsed):
+        self.timings["decrypt"] += elapsed
     def set_storage_index(self, si):
         self.storage_index = si
     def set_helper(self, helper):
@@ -105,9 +111,8 @@ class Retrieve:
         # during repair, we may be called upon to grab the private key, since
         # it wasn't picked up during a verify=False checker run, and we'll
         # need it for repair to generate a new version.
-        self._need_privkey = fetch_privkey or verify
-        if self._node.get_privkey() and not verify:
-            self._need_privkey = False
+        self._need_privkey = verify or (fetch_privkey
+                                        and not self._node.get_privkey())
 
         if self._need_privkey:
             # TODO: Evaluate the need for this. We'll use it if we want
@@ -126,9 +131,7 @@ class Retrieve:
         # 3. When we are validating readers, we need to validate the
         #    signature on the prefix. Do we? We already do this in the
         #    servermap update?
-        self._verify = False
-        if verify:
-            self._verify = True
+        self._verify = verify
 
         self._status = RetrieveStatus()
         self._status.set_storage_index(self._storage_index)
@@ -140,8 +143,7 @@ class Retrieve:
         self._status.set_size(datalength)
         self._status.set_encoding(k, N)
         self.readers = {}
-        self._paused = False
-        self._paused_deferred = None
+        self._pause_deferred = None
         self._offset = None
         self._read_length = None
         self.log("got seqnum %d" % self.verinfo[0])
@@ -157,6 +159,9 @@ class Retrieve:
             kwargs["facility"] = "tahoe.mutable.retrieve"
         return log.msg(*args, **kwargs)
 
+    def _set_current_status(self, state):
+        seg = "%d/%d" % (self._current_segment, self._last_segment)
+        self._status.set_status("segment %s (%s)" % (seg, state))
 
     ###################
     # IPushProducer
@@ -167,15 +172,14 @@ class Retrieve:
         data for it to handle. I make the downloader stop producing new
         data until my resumeProducing method is called.
         """
-        if self._paused:
+        if self._pause_deferred is not None:
             return
 
         # fired when the download is unpaused. 
         self._old_status = self._status.get_status()
-        self._status.set_status("Paused")
+        self._set_current_status("paused")
 
         self._pause_deferred = defer.Deferred()
-        self._paused = True
 
 
     def resumeProducing(self):
@@ -183,10 +187,9 @@ class Retrieve:
         I am called by my download target once it is ready to begin
         receiving data again.
         """
-        if not self._paused:
+        if self._pause_deferred is None:
             return
 
-        self._paused = False
         p = self._pause_deferred
         self._pause_deferred = None
         self._status.set_status(self._old_status)
@@ -202,7 +205,7 @@ class Retrieve:
         the Deferred fires immediately. Otherwise, the Deferred fires
         when the downloader is unpaused.
         """
-        if self._paused:
+        if self._pause_deferred is not None:
             d = defer.Deferred()
             self._pause_deferred.addCallback(lambda ignored: d.callback(res))
             return d
@@ -278,10 +281,10 @@ class Retrieve:
         assert len(self.remaining_sharemap) >= k
 
         self.log("starting download")
-        self._paused = False
         self._started_fetching = time.time()
 
         self._add_active_peers()
+
         # The download process beyond this is a state machine.
         # _add_active_peers will select the peers that we want to use
         # for the download, and then attempt to start downloading. After
@@ -302,11 +305,11 @@ class Retrieve:
         segment with. I return the plaintext associated with that
         segment.
         """
-        # shnum => block hash tree. Unusued, but setup_encoding_parameters will
+        # shnum => block hash tree. Unused, but setup_encoding_parameters will
         # want to set this.
-        # XXX: Make it so that it won't set this if we're just decoding.
-        self._block_hash_trees = {}
+        self._block_hash_trees = None
         self._setup_encoding_parameters()
+
         # This is the form expected by decode.
         blocks_and_salts = blocks_and_salts.items()
         blocks_and_salts = [(True, [d]) for d in blocks_and_salts]
@@ -370,9 +373,10 @@ class Retrieve:
                  (k, n, self._num_segments, self._segment_size,
                   self._tail_segment_size))
 
-        for i in xrange(self._total_shares):
-            # So we don't have to do this later.
-            self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
+        if self._block_hash_trees is not None:
+            for i in xrange(self._total_shares):
+                # So we don't have to do this later.
+                self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
 
         # Our last task is to tell the downloader where to start and
         # where to stop. We use three parameters for that:
@@ -391,11 +395,7 @@ class Retrieve:
             self.log("got offset: %d" % self._offset)
             # our start segment is the first segment containing the
             # offset we were given. 
-            start = mathutil.div_ceil(self._offset,
-                                      self._segment_size)
-            # this gets us the first segment after self._offset. Then
-            # our start segment is the one before it.
-            start -= 1
+            start = self._offset // self._segment_size
 
             assert start < self._num_segments
             self._start_segment = start
@@ -409,9 +409,11 @@ class Retrieve:
             # segment that we were asked to read.
             self.log("got read length %d" % self._read_length)
             end_data = self._offset + self._read_length
-            end = mathutil.div_ceil(end_data,
-                                    self._segment_size)
-            end -= 1
+
+            # We don't actually need to read the byte at end_data, but
+            # the one before it.
+            end = (end_data - 1) // self._segment_size
+
             assert end < self._num_segments
             self._last_segment = end
             self.log("got end segment: %d" % self._last_segment)
@@ -813,8 +815,9 @@ class Retrieve:
         # validate this block, then generate the block hash root.
         self.log("validating share %d for segment %d" % (reader.shnum,
                                                              segnum))
-        self._status.add_fetch_timing(reader.peerid, started)
-        self._status.set_status("Valdiating blocks for segment %d" % segnum)
+        elapsed = time.time() - started
+        self._status.add_fetch_timing(reader.peerid, elapsed)
+        self._set_current_status("validating blocks")
         # Did we fail to fetch either of the things that we were
         # supposed to? Fail if so.
         if not results[0][0] and results[1][0]:
@@ -953,7 +956,7 @@ class Retrieve:
             shareids.append(shareid)
             shares.append(share)
 
-        self._status.set_status("Decoding")
+        self._set_current_status("decoding")
         started = time.time()
         assert len(shareids) >= self._required_shares, len(shareids)
         # zfec really doesn't want extra shares
@@ -978,8 +981,7 @@ class Retrieve:
                 size_to_use = self._segment_size
             segment = segment[:size_to_use]
             self.log(" segment len=%d" % len(segment))
-            self._status.timings.setdefault("decode", 0)
-            self._status.timings['decode'] = time.time() - started
+            self._status.accumulate_decode_time(time.time() - started)
             return segment, salt
         d.addCallback(_process)
         return d
@@ -991,14 +993,13 @@ class Retrieve:
         the plaintext of the segment that is in my argument.
         """
         segment, salt = segment_and_salt
-        self._status.set_status("decrypting")
+        self._set_current_status("decrypting")
         self.log("decrypting segment %d" % self._current_segment)
         started = time.time()
         key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
         decryptor = AES(key)
         plaintext = decryptor.process(segment)
-        self._status.timings.setdefault("decrypt", 0)
-        self._status.timings['decrypt'] = time.time() - started
+        self._status.accumulate_decrypt_time(time.time() - started)
         return plaintext
 
 
@@ -1086,6 +1087,14 @@ class Retrieve:
         now = time.time()
         self._status.timings['total'] = now - self._started
         self._status.timings['fetch'] = now - self._started_fetching
+        self._status.set_status("Finished")
+        self._status.set_progress(1.0)
+
+        # remember the encoding parameters, use them again next time
+        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+         offsets_tuple) = self.verinfo
+        self._node._populate_required_shares(k)
+        self._node._populate_total_shares(N)
 
         if self._verify:
             ret = list(self._bad_shares)
@@ -1110,6 +1119,7 @@ class Retrieve:
         now = time.time()
         self._status.timings['total'] = now - self._started
         self._status.timings['fetch'] = now - self._started_fetching
+        self._status.set_status("Failed")
 
         if self._verify:
             ret = list(self._bad_shares)