]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
mutable: tolerate multiple encodings, using whichever version is recoverable first...
authorBrian Warner <warner@lothar.com>
Tue, 11 Mar 2008 07:26:00 +0000 (00:26 -0700)
committerBrian Warner <warner@lothar.com>
Tue, 11 Mar 2008 07:26:00 +0000 (00:26 -0700)
src/allmydata/mutable.py
src/allmydata/test/test_mutable.py

index e24372b88ea173f017af505d5a79b1d1c9acf566..b9792fde8e265cd63a37e940e902c2486e85232f 100644 (file)
@@ -387,22 +387,19 @@ class Retrieve:
 
         initial_query_count = 5
 
-        # we might not know how many shares we need yet.
-        self._required_shares = self._node.get_required_shares()
-        self._total_shares = self._node.get_total_shares()
-
         # self._valid_versions is a dictionary in which the keys are
-        # 'verinfo' tuples (seqnum, root_hash, IV). Every time we hear about
-        # a new potential version of the file, we check its signature, and
-        # the valid ones are added to this dictionary. The values of the
-        # dictionary are (prefix, sharemap) tuples, where 'prefix' is just
-        # the first part of the share (containing the serialized verinfo),
-        # for easier comparison. 'sharemap' is a DictOfSets, in which the
-        # keys are sharenumbers, and the values are sets of (peerid, data)
-        # tuples. There is a (peerid, data) tuple for every instance of a
-        # given share that we've seen. The 'data' in this tuple is a full
-        # copy of the SDMF share, starting with the \x00 version byte and
-        # continuing through the last byte of sharedata.
+        # 'verinfo' tuples (seqnum, root_hash, IV, segsize, datalength, k,
+        # N). Every time we hear about a new potential version of the file,
+        # we check its signature, and the valid ones are added to this
+        # dictionary. The values of the dictionary are (prefix, sharemap)
+        # tuples, where 'prefix' is just the first part of the share
+        # (containing the serialized verinfo), for easier comparison.
+        # 'sharemap' is a DictOfSets, in which the keys are sharenumbers, and
+        # the values are sets of (peerid, data) tuples. There is a (peerid,
+        # data) tuple for every instance of a given share that we've seen.
+        # The 'data' in this tuple is a full copy of the SDMF share, starting
+        # with the \x00 version byte and continuing through the last byte of
+        # sharedata.
         self._valid_versions = {}
 
         # self._valid_shares is a dict mapping (peerid,data) tuples to
@@ -544,7 +541,7 @@ class Retrieve:
             self._pubkey = self._deserialize_pubkey(pubkey_s)
             self._node._populate_pubkey(self._pubkey)
 
-        verinfo = (seqnum, root_hash, IV, segsize, datalength) #, k, N)
+        verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N)
         self._status.sharemap[peerid].add(verinfo)
 
         if verinfo not in self._valid_versions:
@@ -569,42 +566,9 @@ class Retrieve:
                         k, N, segsize, datalength))
             self._valid_versions[verinfo] = (prefix, DictOfSets())
 
-            # and make a note of the other parameters we've just learned
-            # NOTE: Retrieve needs to be refactored to put k,N in the verinfo
-            # along with seqnum/etc, to make sure we don't co-mingle shares
-            # from differently-encoded versions of the same file.
-            if self._required_shares is None:
-                self._required_shares = k
-                self._node._populate_required_shares(k)
-            if self._total_shares is None:
-                self._total_shares = N
-                self._node._populate_total_shares(N)
-
-        # reject shares that don't match our narrow-minded ideas of what
-        # encoding we're going to use. This addresses the immediate needs of
-        # ticket #312, by turning the data corruption into unavailability. To
-        # get back the availability (i.e. make sure that one weird-encoding
-        # share that happens to come back first doesn't make us ignore the
-        # rest of the shares), we need to implement the refactoring mentioned
-        # above.
-        if k != self._required_shares:
-            self._status.problems[peerid] = "sh#%d: k=%d, we want %d" \
-                                            % (shnum, k, self._required_shares)
-            raise CorruptShareError(peerid, shnum,
-                                    "share has k=%d, we want k=%d" %
-                                    (k, self._required_shares))
-
-        if N != self._total_shares:
-            self._status.problems[peerid] = "sh#%d: N=%d, we want %d" \
-                                            % (shnum, N, self._total_shares)
-            raise CorruptShareError(peerid, shnum,
-                                    "share has N=%d, we want N=%d" %
-                                    (N, self._total_shares))
-
-        # we've already seen this pair, and checked the signature so we
-        # know it's a valid candidate. Accumulate the share info, if
-        # there's enough data present. If not, raise NeedMoreDataError,
-        # which will trigger a re-fetch.
+        # We now know that this is a valid candidate verinfo. Accumulate the
+        # share info, if there's enough data present. If not, raise
+        # NeedMoreDataError, which will trigger a re-fetch.
         _ignored = unpack_share_data(data)
         self.log(" found enough data to add share contents")
         self._valid_versions[verinfo][1].add(shnum, (peerid, data))
@@ -625,11 +589,14 @@ class Retrieve:
             return
         share_prefixes = {}
         versionmap = DictOfSets()
+        max_N = 0
         for verinfo, (prefix, sharemap) in self._valid_versions.items():
             # sharemap is a dict that maps shnums to sets of (peerid,data).
             # len(sharemap) is the number of distinct shares that appear to
             # be available.
-            if len(sharemap) >= self._required_shares:
+            (seqnum, root_hash, IV, segsize, datalength, k, N) = verinfo
+            max_N = max(max_N, N)
+            if len(sharemap) >= k:
                 # this one looks retrievable. TODO: our policy of decoding
                 # the first version that we can get is a bit troublesome: in
                 # a small grid with a large expansion factor, a single
@@ -674,11 +641,11 @@ class Retrieve:
 
         # no more queries are outstanding. Can we send out more? First,
         # should we be looking at more peers?
-        self.log("need more peers: N=%s, peerlist=%d peerlist_limit=%d" %
-                 (self._total_shares, len(self._peerlist),
+        self.log("need more peers: max_N=%s, peerlist=%d peerlist_limit=%d" %
+                 (max_N, len(self._peerlist),
                   self._peerlist_limit), level=log.UNUSUAL)
-        if self._total_shares is not None:
-            search_distance = self._total_shares * 2
+        if max_N:
+            search_distance = max_N * 2
         else:
             search_distance = 20
         self.log("search_distance=%d" % search_distance, level=log.UNUSUAL)
@@ -721,9 +688,9 @@ class Retrieve:
 
     def _attempt_decode(self, verinfo, sharemap):
         # sharemap is a dict which maps shnum to [(peerid,data)..] sets.
-        (seqnum, root_hash, IV, segsize, datalength) = verinfo
+        (seqnum, root_hash, IV, segsize, datalength, k, N) = verinfo
 
-        assert len(sharemap) >= self._required_shares, len(sharemap)
+        assert len(sharemap) >= k, len(sharemap)
 
         shares_s = []
         for shnum in sorted(sharemap.keys()):
@@ -784,8 +751,8 @@ class Retrieve:
         # it's now down to doing FEC and decrypt.
         elapsed = time.time() - self._started
         self._status.timings["fetch"] = elapsed
-        assert len(shares) >= self._required_shares, len(shares)
-        d = defer.maybeDeferred(self._decode, shares, segsize, datalength)
+        assert len(shares) >= k, len(shares)
+        d = defer.maybeDeferred(self._decode, shares, segsize, datalength, k, N)
         d.addCallback(self._decrypt, IV, seqnum, root_hash)
         return d
 
@@ -818,10 +785,7 @@ class Retrieve:
         self.log(" data valid! len=%d" % len(share_data))
         return share_data
 
-    def _decode(self, shares_dict, segsize, datalength):
-        # we ought to know these values by now
-        assert self._required_shares is not None
-        assert self._total_shares is not None
+    def _decode(self, shares_dict, segsize, datalength, k, N):
 
         # shares_dict is a dict mapping shnum to share data, but the codec
         # wants two lists.
@@ -830,14 +794,13 @@ class Retrieve:
             shareids.append(shareid)
             shares.append(share)
 
-        assert len(shareids) >= self._required_shares, len(shareids)
+        assert len(shareids) >= k, len(shareids)
         # zfec really doesn't want extra shares
-        shareids = shareids[:self._required_shares]
-        shares = shares[:self._required_shares]
+        shareids = shareids[:k]
+        shares = shares[:k]
 
         fec = codec.CRSDecoder()
-        params = "%d-%d-%d" % (segsize,
-                               self._required_shares, self._total_shares)
+        params = "%d-%d-%d" % (segsize, k, N)
         fec.set_serialized_params(params)
 
         self.log("params %s, we have %d shares" % (params, len(shares)))
@@ -847,7 +810,12 @@ class Retrieve:
         def _done(buffers):
             elapsed = time.time() - started
             self._status.timings["decode"] = elapsed
-            self._status.set_encoding(self._required_shares, self._total_shares)
+            self._status.set_encoding(k, N)
+
+            # stash these in the MutableFileNode to speed up the next pass
+            self._node._populate_required_shares(k)
+            self._node._populate_total_shares(N)
+
             self.log(" decode done, %d buffers" % len(buffers))
             segment = "".join(buffers)
             self.log(" joined length %d, datalength %d" %
index f9380d66b43b8594d0edb5eca1265a01d5444204..1ebb0df4540b0a0979300f73cf3c8251bdbc616e 100644 (file)
@@ -830,10 +830,8 @@ class Roundtrip(unittest.TestCase):
             return r3.retrieve()
         d.addCallback(_retrieve)
         def _retrieved(new_contents):
-            ## the current specified behavior is "first version recoverable"
-            #self.failUnlessEqual(new_contents, contents1)
-            # the current behavior is "first version seen is sticky"
-            self.failUnlessEqual(new_contents, contents2)
+            # the current specified behavior is "first version recoverable"
+            self.failUnlessEqual(new_contents, contents1)
         d.addCallback(_retrieved)
         return d