]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
mutable: fix control flow to allow good+bad shares from a peer. Fixes #211.
authorBrian Warner <warner@allmydata.com>
Sat, 17 Nov 2007 00:12:33 +0000 (17:12 -0700)
committerBrian Warner <warner@allmydata.com>
Sat, 17 Nov 2007 00:12:33 +0000 (17:12 -0700)
src/allmydata/mutable.py

index 6253c51b7bd2e6742daf14e3cdcbdd8077f8b573..3fbe6fb9f4c9c66aec46253ddcaa1a9494b7574f 100644 (file)
@@ -342,9 +342,9 @@ class Retrieve:
             d.addCallback(_got_storageserver)
         d.addCallback(lambda ss: ss.callRemote("slot_readv", storage_index,
                                                [], [(0, readsize)]))
-        d.addCallback(self._got_results, peerid, readsize)
-        d.addErrback(self._query_failed, peerid, (conn, storage_index,
-                                                  peer_storage_servers))
+        d.addCallback(self._got_results, peerid, readsize,
+                      (conn, storage_index, peer_storage_servers))
+        d.addErrback(self._query_failed, peerid)
         # errors that aren't handled by _query_failed (and errors caused by
         # _query_failed) get logged, but we still want to check for doneness.
         d.addErrback(log.err)
@@ -355,7 +355,7 @@ class Retrieve:
         verifier = rsa.create_verifying_key_from_string(pubkey_s)
         return verifier
 
-    def _got_results(self, datavs, peerid, readsize):
+    def _got_results(self, datavs, peerid, readsize, stuff):
         self._queries_outstanding.discard(peerid)
         self._used_peers.add(peerid)
         if not self._running:
@@ -363,77 +363,88 @@ class Retrieve:
 
         for shnum,datav in datavs.items():
             data = datav[0]
-            self.log("_got_results: got shnum #%d from peerid %s"
-                     % (shnum, idlib.shortnodeid_b2a(peerid)))
-            (seqnum, root_hash, IV, k, N, segsize, datalength,
-             pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
-
-            if not self._pubkey:
-                fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
-                if fingerprint != self._node._fingerprint:
-                    # bad share
-                    raise CorruptShareError(peerid, shnum,
-                                            "pubkey doesn't match fingerprint")
-                self._pubkey = self._deserialize_pubkey(pubkey_s)
-                self._node._populate_pubkey(self._pubkey)
-
-            verinfo = (seqnum, root_hash, IV, segsize, datalength)
-            if verinfo not in self._valid_versions:
-                # it's a new pair. Verify the signature.
-                valid = self._pubkey.verify(prefix, signature)
-                if not valid:
-                    raise CorruptShareError(peerid, shnum,
-                                            "signature is invalid")
-                # ok, it's a valid verinfo. Add it to the list of validated
-                # versions.
-                self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
-                         % (seqnum, idlib.b2a(root_hash)[:4],
-                            idlib.shortnodeid_b2a(peerid), shnum,
-                            k, N, segsize, datalength))
-                self._valid_versions[verinfo] = (prefix, DictOfSets())
-
-                # and make a note of the other parameters we've just learned
-                if self._required_shares is None:
-                    self._required_shares = k
-                    self._node._populate_required_shares(k)
-                if self._total_shares is None:
-                    self._total_shares = N
-                    self._node._populate_total_shares(N)
-
-            # we've already seen this pair, and checked the signature so we
-            # know it's a valid candidate. Accumulate the share info, if
-            # there's enough data present. If not, raise NeedMoreDataError,
-            # which will trigger a re-fetch.
-            _ignored = unpack_share(data)
-            self.log(" found enough data to add share contents")
-            self._valid_versions[verinfo][1].add(shnum, (peerid, data))
-
-
-    def _query_failed(self, f, peerid, stuff):
-        self._queries_outstanding.discard(peerid)
-        self._used_peers.add(peerid)
+            try:
+                self._got_results_one_share(shnum, data, peerid)
+            except NeedMoreDataError, e:
+                # ah, just re-send the query then.
+                self._read_size = max(self._read_size, e.needed_bytes)
+                # TODO: for MDMF, sanity-check self._read_size: don't let one
+                # server cause us to try to read gigabytes of data from all
+                # other servers.
+                (conn, storage_index, peer_storage_servers) = stuff
+                self._do_query(conn, peerid, storage_index, self._read_size,
+                               peer_storage_servers)
+                return
+            except CorruptShareError, e:
+                # log it and give the other shares a chance to be processed
+                f = failure.Failure()
+                self.log("WEIRD: bad share: %s %s" % (f, f.value))
+                self._bad_peerids.add(peerid)
+                self._last_failure = f
+                pass
+        # all done!
+
+    def _got_results_one_share(self, shnum, data, peerid):
+        self.log("_got_results: got shnum #%d from peerid %s"
+                 % (shnum, idlib.shortnodeid_b2a(peerid)))
+        (seqnum, root_hash, IV, k, N, segsize, datalength,
+         # this might raise NeedMoreDataError, in which case the rest of
+         # the shares are probably short too. _query_failed() will take
+         # responsiblity for re-issuing the queries with a new length.
+         pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
+
+        if not self._pubkey:
+            fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
+            if fingerprint != self._node._fingerprint:
+                raise CorruptShareError(peerid, shnum,
+                                        "pubkey doesn't match fingerprint")
+            self._pubkey = self._deserialize_pubkey(pubkey_s)
+            self._node._populate_pubkey(self._pubkey)
+
+        verinfo = (seqnum, root_hash, IV, segsize, datalength)
+        if verinfo not in self._valid_versions:
+            # it's a new pair. Verify the signature.
+            valid = self._pubkey.verify(prefix, signature)
+            if not valid:
+                raise CorruptShareError(peerid, shnum,
+                                        "signature is invalid")
+            # ok, it's a valid verinfo. Add it to the list of validated
+            # versions.
+            self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
+                     % (seqnum, idlib.b2a(root_hash)[:4],
+                        idlib.shortnodeid_b2a(peerid), shnum,
+                        k, N, segsize, datalength))
+            self._valid_versions[verinfo] = (prefix, DictOfSets())
+
+            # and make a note of the other parameters we've just learned
+            if self._required_shares is None:
+                self._required_shares = k
+                self._node._populate_required_shares(k)
+            if self._total_shares is None:
+                self._total_shares = N
+                self._node._populate_total_shares(N)
+
+        # we've already seen this pair, and checked the signature so we
+        # know it's a valid candidate. Accumulate the share info, if
+        # there's enough data present. If not, raise NeedMoreDataError,
+        # which will trigger a re-fetch.
+        _ignored = unpack_share(data)
+        self.log(" found enough data to add share contents")
+        self._valid_versions[verinfo][1].add(shnum, (peerid, data))
+
+
+    def _query_failed(self, f, peerid):
         if not self._running:
             return
-        if f.check(NeedMoreDataError):
-            # ah, just re-send the query then.
-            self._read_size = max(self._read_size, f.value.needed_bytes)
-            (conn, storage_index, peer_storage_servers) = stuff
-            self._do_query(conn, peerid, storage_index, self._read_size,
-                           peer_storage_servers)
-            return
+        self._queries_outstanding.discard(peerid)
+        self._used_peers.add(peerid)
         self._last_failure = f
         self._bad_peerids.add(peerid)
-        short_sid = idlib.b2a(self._storage_index)[:6]
-        if f.check(CorruptShareError):
-            self.log("WEIRD: bad share for %s: %s %s" % (short_sid, f,
-                                                         f.value))
-        else:
-            self.log("WEIRD: other error for %s: %s %s" % (short_sid, f,
-                                                           f.value))
+        self.log("WEIRD: error during query: %s %s" % (f, f.value))
 
     def _check_for_done(self, res):
         if not self._running:
-            self.log("UNUSUAL: _check_for_done but we're not running")
+            self.log("ODD: _check_for_done but we're not running")
             return
         share_prefixes = {}
         versionmap = DictOfSets()