mutable: get most of the retrieve-side code written. no tests yet.
authorBrian Warner <warner@lothar.com>
Tue, 6 Nov 2007 10:47:29 +0000 (03:47 -0700)
committerBrian Warner <warner@lothar.com>
Tue, 6 Nov 2007 10:47:29 +0000 (03:47 -0700)
src/allmydata/mutable.py

index dec8d0c3446f68aae465f998eeb50e7ac3e1254a..a1f48a3d12afbd1f095a9186a516fac695df8015 100644 (file)
@@ -1,9 +1,12 @@
 
-import os, struct, itertools
+import os, struct
+from itertools import islice
 from zope.interface import implements
 from twisted.internet import defer
+from twisted.python import failure
+from foolscap.eventual import eventually
 from allmydata.interfaces import IMutableFileNode, IMutableFileURI
-from allmydata.util import hashutil, mathutil
+from allmydata.util import hashutil, mathutil, idlib
 from allmydata.uri import WriteableSSKFileURI
 from allmydata.Crypto.Cipher import AES
 from allmydata import hashtree, codec
@@ -18,8 +21,49 @@ class NeedMoreDataError(Exception):
 class UncoordinatedWriteError(Exception):
     pass
 
+class CorruptShareError(Exception):
+    def __init__(self, peerid, shnum, reason):
+        self.peerid = peerid
+        self.shnum = shnum
+        self.reason = reason
+    def __repr__(self):
+        # TODO: in some places we use idlib.b2a, in others (foolscap) we use
+        # stdlib b32encode. Fix this discrepancy.
+        short_peerid = idlib.b2a(self.peerid)[:8]
+        return "<CorruptShareError peerid=%s shnum[%d]: %s" % (short_peerid,
+                                                               self.shnum,
+                                                               self.reason)
+
 HEADER_LENGTH = struct.calcsize(">BQ32s BBQQ LLLLLQQ")
 
+def unpack_prefix_and_signature(data):
+    assert len(data) >= HEADER_LENGTH
+    o = {}
+    prefix = data[:struct.calcsize(">BQ32s BBQQ")]
+
+    (version,
+     seqnum,
+     root_hash,
+     k, N, segsize, datalen,
+     o['signature'],
+     o['share_hash_chain'],
+     o['block_hash_tree'],
+     o['IV'],
+     o['share_data'],
+     o['enc_privkey'],
+     o['EOF']) = struct.unpack(">BQ32s BBQQ LLLLLQQ",
+                               data[:HEADER_LENGTH])
+
+    assert version == 0
+    if len(data) < o['share_hash_chain']:
+        raise NeedMoreDataError(o['share_hash_chain'])
+
+    pubkey_s = data[HEADER_LENGTH:o['signature']]
+    signature = data[o['signature']:o['share_hash_chain']]
+
+    return (seqnum, root_hash, k, N, segsize, datalen,
+            pubkey_s, signature, prefix)
+
 def unpack_share(data):
     assert len(data) >= HEADER_LENGTH
     o = {}
@@ -199,6 +243,8 @@ class MutableFileNode:
         return self._writekey
     def get_readkey(self):
         return self._readkey
+    def get_storage_index(self):
+        return self._storage_index
     def get_privkey(self):
         return self._privkey
     def get_encprivkey(self):
@@ -251,6 +297,258 @@ class MutableFileNode:
 class Retrieve:
     def __init__(self, filenode):
         self._node = filenode
+        self._contents = None
+        # if the filenode already has a copy of the pubkey, use it. Otherwise
+        # we'll grab a copy from the first peer we talk to.
+        self._pubkey = filenode.get_pubkey()
+        self._storage_index = filenode.get_storage_index()
+
+    def retrieve(self):
+        """Retrieve the filenode's current contents. Returns a Deferred that
+        fires with a string when the contents have been retrieved."""
+
+        # 1: make a guess as to how many peers we should send requests to. We
+        #    want to hear from k+EPSILON (k because we have to, EPSILON extra
+        #    because that helps us resist rollback attacks). [TRADEOFF:
+        #    EPSILON>0 means extra work] [TODO: implement EPSILON>0]
+        # 2: build the permuted peerlist, taking the first k+E peers
+        # 3: send readv requests to all of them in parallel, asking for the
+        #    first 2KB of data from all shares
+        # 4: when the first of the responses comes back, extract information:
+        # 4a: extract the pubkey, hash it, compare against the URI. If this
+        #     check fails, log a WEIRD and ignore the peer.
+        # 4b: extract the prefix (seqnum, roothash, k, N, segsize, datalength)
+        #     and verify the signature on it. If this is wrong, log a WEIRD
+        #     and ignore the peer. Save the prefix string in a dict that's
+        #     keyed by (seqnum,roothash) and has (prefixstring, sharemap) as
+        #     values. We'll use the prefixstring again later to avoid doing
+        #     multiple signature checks
+        # 4c: extract the share size (offset of the last byte of sharedata).
+        #     if it is larger than 2k, send new readv requests to pull down
+        #     the extra data
+        # 4d: if the extracted 'k' is more than we guessed, rebuild a larger
+        #     permuted peerlist and send out more readv requests.
+        # 5: as additional responses come back, extract the prefix and compare
+        #    against the ones we've already seen. If they match, add the
+        #    peerid to the corresponing sharemap dict
+        # 6: [TRADEOFF]: if EPSILON==0, when we get k responses for the
+        #    same (seqnum,roothash) key, attempt to reconstruct that data.
+        #    if EPSILON>0, wait for k+EPSILON responses, then attempt to
+        #    reconstruct the most popular version.. If we do not have enough
+        #    shares and there are still requests outstanding, wait. If there
+        #    are not still requests outstanding (todo: configurable), send
+        #    more requests. Never send queries to more than 2*N servers. If
+        #    we've run out of servers, fail.
+        # 7: if we discover corrupt shares during the reconstruction process,
+        #    remove that share from the sharemap.  and start step#6 again.
+
+        initial_query_count = 5
+        self._read_size = 2000
+
+        # we might not know how many shares we need yet.
+        self._required_shares = self._node.get_required_shares()
+        self._total_shares = self._node.get_total_shares()
+        self._segsize = None
+        self._datalength = None
+
+        d = defer.succeed(initial_query_count)
+        d.addCallback(self._choose_initial_peers)
+        d.addCallback(self._send_initial_requests)
+        d.addCallback(lambda res: self._contents)
+        return d
+
+    def _choose_initial_peers(self, numqueries):
+        n = self._node
+        full_peerlist = n._client.get_permuted_peers(self._storage_index,
+                                                     include_myself=True)
+        # _peerlist is a list of (peerid,conn) tuples for peers that are
+        # worth talking too. This starts with the first numqueries in the
+        # permuted list. If that's not enough to get us a recoverable
+        # version, we expand this to include the first 2*total_shares peerids
+        # (assuming we learn what total_shares is from one of the first
+        # numqueries peers)
+        self._peerlist = [(p[1],p[2])
+                          for p in islice(full_peerlist, numqueries)]
+        # _peerlist_limit is the query limit we used to build this list. If
+        # we later increase this limit, it may be useful to re-scan the
+        # permuted list.
+        self._peerlist_limit = numqueries
+        return self._peerlist
+
+    def _send_initial_requests(self, peerlist):
+        self._bad_peerids = set()
+        self._running = True
+        self._queries_outstanding = set()
+        self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
+        self._peer_storage_servers = {}
+        dl = []
+        for (permutedid, peerid, conn) in peerlist:
+            self._queries_outstanding.add(peerid)
+            self._do_query(conn, peerid, self._storage_index, self._read_size,
+                           self._peer_storage_servers)
+
+        # control flow beyond this point: state machine. Receiving responses
+        # from queries is the input. We might send out more queries, or we
+        # might produce a result.
+
+        d = self._done_deferred = defer.Deferred()
+        return d
+
+    def _do_query(self, conn, peerid, storage_index, readsize,
+                  peer_storage_servers):
+        self._queries_outstanding.add(peerid)
+        if peerid in peer_storage_servers:
+            d = defer.succeed(peer_storage_servers[peerid])
+        else:
+            d = conn.callRemote("get_service", "storageserver")
+            def _got_storageserver(ss):
+                peer_storage_servers[peerid] = ss
+                return ss
+            d.addCallback(_got_storageserver)
+        d.addCallback(lambda ss: ss.callRemote("readv_slots", [(0, readsize)]))
+        d.addCallback(self._got_results, peerid, readsize)
+        d.addErrback(self._query_failed, peerid, (conn, storage_index,
+                                                  peer_storage_servers))
+        return d
+
+    def _deserialize_pubkey(self, pubkey_s):
+        # TODO
+        return None
+
+    def _got_results(self, datavs, peerid, readsize):
+        self._queries_outstanding.discard(peerid)
+        self._used_peers.add(peerid)
+        if not self._running:
+            return
+
+        for shnum,datav in datavs.items():
+            data = datav[0]
+            (seqnum, root_hash, k, N, segsize, datalength,
+             pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
+
+            if not self._pubkey:
+                fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
+                if fingerprint != self._node._fingerprint:
+                    # bad share
+                    raise CorruptShareError(peerid,
+                                            "pubkey doesn't match fingerprint")
+                self._pubkey = self._deserialize_pubkey(pubkey_s)
+
+            verinfo = (seqnum, root_hash)
+            if verinfo not in self._valid_versions:
+                # it's a new pair. Verify the signature.
+                valid = self._pubkey.verify(prefix, signature)
+                if not valid:
+                    raise CorruptShareError(peerid,
+                                            "signature is invalid")
+                # ok, it's a valid verinfo. Add it to the list of validated
+                # versions.
+                self._valid_versions[verinfo] = (prefix, DictOfSets())
+
+                # and make a note of the other parameters we've just learned
+                if self._required_shares is None:
+                    self._required_shares = k
+                if self._total_shares is None:
+                    self._total_shares = N
+                if self._segsize is None:
+                    self._segsize = segsize
+                if self._datalength is None:
+                    self._datalength = datalength
+
+            # we've already seen this pair, and checked the signature so we
+            # know it's a valid candidate. Accumulate the share info, if
+            # there's enough data present. If not, raise NeedMoreDataError,
+            # which will trigger a re-fetch.
+            _ignored = unpack_share(data)
+            self._valid_versions[verinfo][1].add(shnum, (peerid, data))
+
+        self._check_for_done()
+
+
+    def _query_failed(self, f, peerid, stuff):
+        self._queries_outstanding.discard(peerid)
+        self._used_peers.add(peerid)
+        if not self._running:
+            return
+        if f.check(NeedMoreDataError):
+            # ah, just re-send the query then.
+            self._read_size = max(self._read_size, f.needed_bytes)
+            (conn, storage_index, peer_storage_servers) = stuff
+            self._do_query(conn, peerid, storage_index, self._read_size,
+                           peer_storage_servers)
+            return
+        self._bad_peerids.add(peerid)
+        short_sid = idlib.a2b(self.storage_index)[:6]
+        if f.check(CorruptShareError):
+            self._node._client.log("WEIRD: bad share for %s: %s" %
+                                   (short_sid, f))
+        else:
+            self._node._client.log("WEIRD: other error for %s: %s" %
+                                   (short_sid, f))
+        self._check_for_done()
+
+    def _check_for_done(self):
+        share_prefixes = {}
+        versionmap = DictOfSets()
+        for prefix, sharemap in self._valid_versions.values():
+            if len(sharemap) >= self._required_shares:
+                # this one looks retrievable
+                try:
+                    contents = self._extract_data(sharemap)
+                except CorruptShareError:
+                    # log(WEIRD)
+                    # _extract_data is responsible for removing the bad
+                    # share, so we can just try again
+                    return self._check_for_done()
+                # success!
+                return self._done(contents)
+        # we don't have enough shares yet. Should we send out more queries?
+        if self._queries_outstanding:
+            # there are some running, so just wait for them to come back.
+            # TODO: if our initial guess at k was too low, waiting for these
+            # responses before sending new queries will increase our latency,
+            # so we could speed things up by sending new requests earlier.
+            return
+
+        # no more queries are outstanding. Can we send out more? First,
+        # should we be looking at more peers?
+        if self._total_shares is not None:
+            search_distance = self._total_shares * 2
+        else:
+            search_distance = 20
+        if self._peerlist_limit < search_distance:
+            # we might be able to get some more peers from the list
+            peers = self._node._client.get_permuted_peers(self._storage_index,
+                                                          include_myself=True)
+            self._peerlist = [(p[1],p[2])
+                              for p in islice(peers, search_distance)]
+            self._peerlist_limit = search_distance
+        # are there any peers on the list that we haven't used?
+        new_query_peers = []
+        for (peerid, conn) in self._peerlist:
+            if peerid not in self._used_peers:
+                new_query_peers.append( (peerid, conn) )
+                if len(new_query_peers) > 5:
+                    # only query in batches of 5. TODO: this is pretty
+                    # arbitrary, really I want this to be something like
+                    # k - max(known_version_sharecounts) + some extra
+                    break
+        if new_query_peers:
+            for (peerid, conn) in new_query_peers:
+                self._do_query(conn, peerid,
+                               self._storage_index, self._read_size,
+                               self._peer_storage_servers)
+            # we'll retrigger when those queries come back
+            return
+
+        # we've used up all the peers we're allowed to search. Failure.
+        return self._done(failure.Failure(NotEnoughPeersError()))
+
+    def _done(self, contents):
+        self._running = False
+        eventually(self._done_deferred.callback, contents)
+
+
 
 class DictOfSets(dict):
     def add(self, key, value):
@@ -417,7 +715,7 @@ class Publish:
         self._new_root_hash = root_hash
         self._new_shares = final_shares
 
-        storage_index = self._node._uri.storage_index
+        storage_index = self._node.get_storage_index()
         peerlist = self._node._client.get_permuted_peers(storage_index,
                                                          include_myself=False)
         # we don't include ourselves in the N peers, but we *do* push an
@@ -425,12 +723,13 @@ class Publish:
         # the signing key around later. This way, even if all the servers die
         # and the directory contents are unrecoverable, at least we can still
         # push out a new copy with brand-new contents.
+        # TODO: actually push this copy
 
         current_share_peers = DictOfSets()
         reachable_peers = {}
 
         EPSILON = total_shares / 2
-        partial_peerlist = itertools.islice(peerlist, total_shares + EPSILON)
+        partial_peerlist = islice(peerlist, total_shares + EPSILON)
         peer_storage_servers = {}
         dl = []
         for (permutedid, peerid, conn) in partial_peerlist: