]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/mutable/retrieve.py
rearrange Retrieve: first step, shouldn't change order of execution
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / mutable / retrieve.py
index b1ec761efa0f7165c224c15dbe510564417b4aa8..25930c8e90705ba5533f372ac271d1e09a266cae 100644 (file)
@@ -24,6 +24,8 @@ class RetrieveStatus:
     def __init__(self):
         self.timings = {}
         self.timings["fetch_per_server"] = {}
+        self.timings["decode"] = 0.0
+        self.timings["decrypt"] = 0.0
         self.timings["cumulative_verify"] = 0.0
         self.problems = {}
         self.active = True
@@ -59,6 +61,10 @@ class RetrieveStatus:
         if peerid not in self.timings["fetch_per_server"]:
             self.timings["fetch_per_server"][peerid] = []
         self.timings["fetch_per_server"][peerid].append(elapsed)
+    def accumulate_decode_time(self, elapsed):
+        self.timings["decode"] += elapsed
+    def accumulate_decrypt_time(self, elapsed):
+        self.timings["decrypt"] += elapsed
     def set_storage_index(self, si):
         self.storage_index = si
     def set_helper(self, helper):
@@ -153,6 +159,9 @@ class Retrieve:
             kwargs["facility"] = "tahoe.mutable.retrieve"
         return log.msg(*args, **kwargs)
 
+    def _set_current_status(self, state):
+        seg = "%d/%d" % (self._current_segment, self._last_segment)
+        self._status.set_status("segment %s (%s)" % (seg, state))
 
     ###################
     # IPushProducer
@@ -168,7 +177,7 @@ class Retrieve:
 
         # fired when the download is unpaused. 
         self._old_status = self._status.get_status()
-        self._status.set_status("Paused")
+        self._set_current_status("paused")
 
         self._pause_deferred = defer.Deferred()
 
@@ -213,11 +222,38 @@ class Retrieve:
             self._consumer.registerProducer(self, streaming=True)
 
         self._done_deferred = defer.Deferred()
+        self._offset = offset
+        self._read_length = size
+        self._setup_download()
+        self._setup_encoding_parameters()
+        self.log("starting download")
+        self._started_fetching = time.time()
+        d = self._add_active_peers()
+        # ...
+        # The download process beyond this is a state machine.
+        # _add_active_peers will select the peers that we want to use
+        # for the download, and then attempt to start downloading. After
+        # each segment, it will check for doneness, reacting to broken
+        # peers and corrupt shares as necessary. If it runs out of good
+        # peers before downloading all of the segments, _done_deferred
+        # will errback.  Otherwise, it will eventually callback with the
+        # contents of the mutable file.
+        return self._done_deferred
+
+    def _setup_download(self):
         self._started = time.time()
         self._status.set_status("Retrieving Shares")
 
-        self._offset = offset
-        self._read_length = size
+        # how many shares do we need?
+        (seqnum,
+         root_hash,
+         IV,
+         segsize,
+         datalength,
+         k,
+         N,
+         prefix,
+         offsets_tuple) = self.verinfo
 
         # first, which servers can we use?
         versionmap = self.servermap.make_versionmap()
@@ -239,7 +275,7 @@ class Retrieve:
                                        any_cache)
             reader.peerid = peerid
             self.readers[shnum] = reader
-
+        assert len(self.remaining_sharemap) >= k
 
         self.shares = {} # maps shnum to validated blocks
         self._active_readers = [] # list of active readers for this dl.
@@ -247,46 +283,12 @@ class Retrieve:
                                         # validated the prefix of
         self._block_hash_trees = {} # shnum => hashtree
 
-        # how many shares do we need?
-        (seqnum,
-         root_hash,
-         IV,
-         segsize,
-         datalength,
-         k,
-         N,
-         prefix,
-         offsets_tuple) = self.verinfo
-
-
         # We need one share hash tree for the entire file; its leaves
         # are the roots of the block hash trees for the shares that
         # comprise it, and its root is in the verinfo.
         self.share_hash_tree = hashtree.IncompleteHashTree(N)
         self.share_hash_tree.set_hashes({0: root_hash})
 
-        # This will set up both the segment decoder and the tail segment
-        # decoder, as well as a variety of other instance variables that
-        # the download process will use.
-        self._setup_encoding_parameters()
-        assert len(self.remaining_sharemap) >= k
-
-        self.log("starting download")
-        self._started_fetching = time.time()
-
-        self._add_active_peers()
-
-        # The download process beyond this is a state machine.
-        # _add_active_peers will select the peers that we want to use
-        # for the download, and then attempt to start downloading. After
-        # each segment, it will check for doneness, reacting to broken
-        # peers and corrupt shares as necessary. If it runs out of good
-        # peers before downloading all of the segments, _done_deferred
-        # will errback.  Otherwise, it will eventually callback with the
-        # contents of the mutable file.
-        return self._done_deferred
-
-
     def decode(self, blocks_and_salts, segnum):
         """
         I am a helper method that the mutable file update process uses
@@ -313,7 +315,7 @@ class Retrieve:
     def _setup_encoding_parameters(self):
         """
         I set up the encoding parameters, including k, n, the number
-        of segments associated with this file, and the segment decoder.
+        of segments associated with this file, and the segment decoders.
         """
         (seqnum,
          root_hash,
@@ -395,18 +397,24 @@ class Retrieve:
             self._start_segment = 0
 
 
-        if self._read_length:
+        # If self._read_length is None, then we want to read the whole
+        # file. Otherwise, we want to read only part of the file, and
+        # need to figure out where to stop reading.
+        if self._read_length is not None:
             # our end segment is the last segment containing part of the
             # segment that we were asked to read.
             self.log("got read length %d" % self._read_length)
-            end_data = self._offset + self._read_length
+            if self._read_length != 0:
+                end_data = self._offset + self._read_length
 
-            # We don't actually need to read the byte at end_data, but
-            # the one before it.
-            end = (end_data - 1) // self._segment_size
+                # We don't actually need to read the byte at end_data,
+                # but the one before it.
+                end = (end_data - 1) // self._segment_size
 
-            assert end < self._num_segments
-            self._last_segment = end
+                assert end < self._num_segments
+                self._last_segment = end
+            else:
+                self._last_segment = self._start_segment
             self.log("got end segment: %d" % self._last_segment)
         else:
             self._last_segment = self._num_segments - 1
@@ -806,8 +814,9 @@ class Retrieve:
         # validate this block, then generate the block hash root.
         self.log("validating share %d for segment %d" % (reader.shnum,
                                                              segnum))
-        self._status.add_fetch_timing(reader.peerid, started)
-        self._status.set_status("Valdiating blocks for segment %d" % segnum)
+        elapsed = time.time() - started
+        self._status.add_fetch_timing(reader.peerid, elapsed)
+        self._set_current_status("validating blocks")
         # Did we fail to fetch either of the things that we were
         # supposed to? Fail if so.
         if not results[0][0] and results[1][0]:
@@ -946,7 +955,7 @@ class Retrieve:
             shareids.append(shareid)
             shares.append(share)
 
-        self._status.set_status("Decoding")
+        self._set_current_status("decoding")
         started = time.time()
         assert len(shareids) >= self._required_shares, len(shareids)
         # zfec really doesn't want extra shares
@@ -971,8 +980,7 @@ class Retrieve:
                 size_to_use = self._segment_size
             segment = segment[:size_to_use]
             self.log(" segment len=%d" % len(segment))
-            self._status.timings.setdefault("decode", 0)
-            self._status.timings['decode'] = time.time() - started
+            self._status.accumulate_decode_time(time.time() - started)
             return segment, salt
         d.addCallback(_process)
         return d
@@ -984,14 +992,13 @@ class Retrieve:
         the plaintext of the segment that is in my argument.
         """
         segment, salt = segment_and_salt
-        self._status.set_status("decrypting")
+        self._set_current_status("decrypting")
         self.log("decrypting segment %d" % self._current_segment)
         started = time.time()
         key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
         decryptor = AES(key)
         plaintext = decryptor.process(segment)
-        self._status.timings.setdefault("decrypt", 0)
-        self._status.timings['decrypt'] = time.time() - started
+        self._status.accumulate_decrypt_time(time.time() - started)
         return plaintext
 
 
@@ -1079,6 +1086,8 @@ class Retrieve:
         now = time.time()
         self._status.timings['total'] = now - self._started
         self._status.timings['fetch'] = now - self._started_fetching
+        self._status.set_status("Finished")
+        self._status.set_progress(1.0)
 
         # remember the encoding parameters, use them again next time
         (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
@@ -1109,6 +1118,7 @@ class Retrieve:
         now = time.time()
         self._status.timings['total'] = now - self._started
         self._status.timings['fetch'] = now - self._started_fetching
+        self._status.set_status("Failed")
 
         if self._verify:
             ret = list(self._bad_shares)