From: Brian Warner Date: Tue, 6 Nov 2007 10:47:29 +0000 (-0700) Subject: mutable: get most of the retrieve-side code written. no tests yet. X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/bar.txt?a=commitdiff_plain;h=207888a97bf7cb0d21be9312fb031bf5b14e4de7;p=tahoe-lafs%2Ftahoe-lafs.git mutable: get most of the retrieve-side code written. no tests yet. --- diff --git a/src/allmydata/mutable.py b/src/allmydata/mutable.py index dec8d0c3..a1f48a3d 100644 --- a/src/allmydata/mutable.py +++ b/src/allmydata/mutable.py @@ -1,9 +1,12 @@ -import os, struct, itertools +import os, struct +from itertools import islice from zope.interface import implements from twisted.internet import defer +from twisted.python import failure +from foolscap.eventual import eventually from allmydata.interfaces import IMutableFileNode, IMutableFileURI -from allmydata.util import hashutil, mathutil +from allmydata.util import hashutil, mathutil, idlib from allmydata.uri import WriteableSSKFileURI from allmydata.Crypto.Cipher import AES from allmydata import hashtree, codec @@ -18,8 +21,49 @@ class NeedMoreDataError(Exception): class UncoordinatedWriteError(Exception): pass +class CorruptShareError(Exception): + def __init__(self, peerid, shnum, reason): + self.peerid = peerid + self.shnum = shnum + self.reason = reason + def __repr__(self): + # TODO: in some places we use idlib.b2a, in others (foolscap) we use + # stdlib b32encode. Fix this discrepancy. + short_peerid = idlib.b2a(self.peerid)[:8] + return "BQ32s BBQQ LLLLLQQ") +def unpack_prefix_and_signature(data): + assert len(data) >= HEADER_LENGTH + o = {} + prefix = data[:struct.calcsize(">BQ32s BBQQ")] + + (version, + seqnum, + root_hash, + k, N, segsize, datalen, + o['signature'], + o['share_hash_chain'], + o['block_hash_tree'], + o['IV'], + o['share_data'], + o['enc_privkey'], + o['EOF']) = struct.unpack(">BQ32s BBQQ LLLLLQQ", + data[:HEADER_LENGTH]) + + assert version == 0 + if len(data) < o['share_hash_chain']: + raise NeedMoreDataError(o['share_hash_chain']) + + pubkey_s = data[HEADER_LENGTH:o['signature']] + signature = data[o['signature']:o['share_hash_chain']] + + return (seqnum, root_hash, k, N, segsize, datalen, + pubkey_s, signature, prefix) + def unpack_share(data): assert len(data) >= HEADER_LENGTH o = {} @@ -199,6 +243,8 @@ class MutableFileNode: return self._writekey def get_readkey(self): return self._readkey + def get_storage_index(self): + return self._storage_index def get_privkey(self): return self._privkey def get_encprivkey(self): @@ -251,6 +297,258 @@ class MutableFileNode: class Retrieve: def __init__(self, filenode): self._node = filenode + self._contents = None + # if the filenode already has a copy of the pubkey, use it. Otherwise + # we'll grab a copy from the first peer we talk to. + self._pubkey = filenode.get_pubkey() + self._storage_index = filenode.get_storage_index() + + def retrieve(self): + """Retrieve the filenode's current contents. Returns a Deferred that + fires with a string when the contents have been retrieved.""" + + # 1: make a guess as to how many peers we should send requests to. We + # want to hear from k+EPSILON (k because we have to, EPSILON extra + # because that helps us resist rollback attacks). [TRADEOFF: + # EPSILON>0 means extra work] [TODO: implement EPSILON>0] + # 2: build the permuted peerlist, taking the first k+E peers + # 3: send readv requests to all of them in parallel, asking for the + # first 2KB of data from all shares + # 4: when the first of the responses comes back, extract information: + # 4a: extract the pubkey, hash it, compare against the URI. If this + # check fails, log a WEIRD and ignore the peer. + # 4b: extract the prefix (seqnum, roothash, k, N, segsize, datalength) + # and verify the signature on it. If this is wrong, log a WEIRD + # and ignore the peer. Save the prefix string in a dict that's + # keyed by (seqnum,roothash) and has (prefixstring, sharemap) as + # values. We'll use the prefixstring again later to avoid doing + # multiple signature checks + # 4c: extract the share size (offset of the last byte of sharedata). + # if it is larger than 2k, send new readv requests to pull down + # the extra data + # 4d: if the extracted 'k' is more than we guessed, rebuild a larger + # permuted peerlist and send out more readv requests. + # 5: as additional responses come back, extract the prefix and compare + # against the ones we've already seen. If they match, add the + # peerid to the corresponing sharemap dict + # 6: [TRADEOFF]: if EPSILON==0, when we get k responses for the + # same (seqnum,roothash) key, attempt to reconstruct that data. + # if EPSILON>0, wait for k+EPSILON responses, then attempt to + # reconstruct the most popular version.. If we do not have enough + # shares and there are still requests outstanding, wait. If there + # are not still requests outstanding (todo: configurable), send + # more requests. Never send queries to more than 2*N servers. If + # we've run out of servers, fail. + # 7: if we discover corrupt shares during the reconstruction process, + # remove that share from the sharemap. and start step#6 again. + + initial_query_count = 5 + self._read_size = 2000 + + # we might not know how many shares we need yet. + self._required_shares = self._node.get_required_shares() + self._total_shares = self._node.get_total_shares() + self._segsize = None + self._datalength = None + + d = defer.succeed(initial_query_count) + d.addCallback(self._choose_initial_peers) + d.addCallback(self._send_initial_requests) + d.addCallback(lambda res: self._contents) + return d + + def _choose_initial_peers(self, numqueries): + n = self._node + full_peerlist = n._client.get_permuted_peers(self._storage_index, + include_myself=True) + # _peerlist is a list of (peerid,conn) tuples for peers that are + # worth talking too. This starts with the first numqueries in the + # permuted list. If that's not enough to get us a recoverable + # version, we expand this to include the first 2*total_shares peerids + # (assuming we learn what total_shares is from one of the first + # numqueries peers) + self._peerlist = [(p[1],p[2]) + for p in islice(full_peerlist, numqueries)] + # _peerlist_limit is the query limit we used to build this list. If + # we later increase this limit, it may be useful to re-scan the + # permuted list. + self._peerlist_limit = numqueries + return self._peerlist + + def _send_initial_requests(self, peerlist): + self._bad_peerids = set() + self._running = True + self._queries_outstanding = set() + self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..] + self._peer_storage_servers = {} + dl = [] + for (permutedid, peerid, conn) in peerlist: + self._queries_outstanding.add(peerid) + self._do_query(conn, peerid, self._storage_index, self._read_size, + self._peer_storage_servers) + + # control flow beyond this point: state machine. Receiving responses + # from queries is the input. We might send out more queries, or we + # might produce a result. + + d = self._done_deferred = defer.Deferred() + return d + + def _do_query(self, conn, peerid, storage_index, readsize, + peer_storage_servers): + self._queries_outstanding.add(peerid) + if peerid in peer_storage_servers: + d = defer.succeed(peer_storage_servers[peerid]) + else: + d = conn.callRemote("get_service", "storageserver") + def _got_storageserver(ss): + peer_storage_servers[peerid] = ss + return ss + d.addCallback(_got_storageserver) + d.addCallback(lambda ss: ss.callRemote("readv_slots", [(0, readsize)])) + d.addCallback(self._got_results, peerid, readsize) + d.addErrback(self._query_failed, peerid, (conn, storage_index, + peer_storage_servers)) + return d + + def _deserialize_pubkey(self, pubkey_s): + # TODO + return None + + def _got_results(self, datavs, peerid, readsize): + self._queries_outstanding.discard(peerid) + self._used_peers.add(peerid) + if not self._running: + return + + for shnum,datav in datavs.items(): + data = datav[0] + (seqnum, root_hash, k, N, segsize, datalength, + pubkey_s, signature, prefix) = unpack_prefix_and_signature(data) + + if not self._pubkey: + fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s) + if fingerprint != self._node._fingerprint: + # bad share + raise CorruptShareError(peerid, + "pubkey doesn't match fingerprint") + self._pubkey = self._deserialize_pubkey(pubkey_s) + + verinfo = (seqnum, root_hash) + if verinfo not in self._valid_versions: + # it's a new pair. Verify the signature. + valid = self._pubkey.verify(prefix, signature) + if not valid: + raise CorruptShareError(peerid, + "signature is invalid") + # ok, it's a valid verinfo. Add it to the list of validated + # versions. + self._valid_versions[verinfo] = (prefix, DictOfSets()) + + # and make a note of the other parameters we've just learned + if self._required_shares is None: + self._required_shares = k + if self._total_shares is None: + self._total_shares = N + if self._segsize is None: + self._segsize = segsize + if self._datalength is None: + self._datalength = datalength + + # we've already seen this pair, and checked the signature so we + # know it's a valid candidate. Accumulate the share info, if + # there's enough data present. If not, raise NeedMoreDataError, + # which will trigger a re-fetch. + _ignored = unpack_share(data) + self._valid_versions[verinfo][1].add(shnum, (peerid, data)) + + self._check_for_done() + + + def _query_failed(self, f, peerid, stuff): + self._queries_outstanding.discard(peerid) + self._used_peers.add(peerid) + if not self._running: + return + if f.check(NeedMoreDataError): + # ah, just re-send the query then. + self._read_size = max(self._read_size, f.needed_bytes) + (conn, storage_index, peer_storage_servers) = stuff + self._do_query(conn, peerid, storage_index, self._read_size, + peer_storage_servers) + return + self._bad_peerids.add(peerid) + short_sid = idlib.a2b(self.storage_index)[:6] + if f.check(CorruptShareError): + self._node._client.log("WEIRD: bad share for %s: %s" % + (short_sid, f)) + else: + self._node._client.log("WEIRD: other error for %s: %s" % + (short_sid, f)) + self._check_for_done() + + def _check_for_done(self): + share_prefixes = {} + versionmap = DictOfSets() + for prefix, sharemap in self._valid_versions.values(): + if len(sharemap) >= self._required_shares: + # this one looks retrievable + try: + contents = self._extract_data(sharemap) + except CorruptShareError: + # log(WEIRD) + # _extract_data is responsible for removing the bad + # share, so we can just try again + return self._check_for_done() + # success! + return self._done(contents) + # we don't have enough shares yet. Should we send out more queries? + if self._queries_outstanding: + # there are some running, so just wait for them to come back. + # TODO: if our initial guess at k was too low, waiting for these + # responses before sending new queries will increase our latency, + # so we could speed things up by sending new requests earlier. + return + + # no more queries are outstanding. Can we send out more? First, + # should we be looking at more peers? + if self._total_shares is not None: + search_distance = self._total_shares * 2 + else: + search_distance = 20 + if self._peerlist_limit < search_distance: + # we might be able to get some more peers from the list + peers = self._node._client.get_permuted_peers(self._storage_index, + include_myself=True) + self._peerlist = [(p[1],p[2]) + for p in islice(peers, search_distance)] + self._peerlist_limit = search_distance + # are there any peers on the list that we haven't used? + new_query_peers = [] + for (peerid, conn) in self._peerlist: + if peerid not in self._used_peers: + new_query_peers.append( (peerid, conn) ) + if len(new_query_peers) > 5: + # only query in batches of 5. TODO: this is pretty + # arbitrary, really I want this to be something like + # k - max(known_version_sharecounts) + some extra + break + if new_query_peers: + for (peerid, conn) in new_query_peers: + self._do_query(conn, peerid, + self._storage_index, self._read_size, + self._peer_storage_servers) + # we'll retrigger when those queries come back + return + + # we've used up all the peers we're allowed to search. Failure. + return self._done(failure.Failure(NotEnoughPeersError())) + + def _done(self, contents): + self._running = False + eventually(self._done_deferred.callback, contents) + + class DictOfSets(dict): def add(self, key, value): @@ -417,7 +715,7 @@ class Publish: self._new_root_hash = root_hash self._new_shares = final_shares - storage_index = self._node._uri.storage_index + storage_index = self._node.get_storage_index() peerlist = self._node._client.get_permuted_peers(storage_index, include_myself=False) # we don't include ourselves in the N peers, but we *do* push an @@ -425,12 +723,13 @@ class Publish: # the signing key around later. This way, even if all the servers die # and the directory contents are unrecoverable, at least we can still # push out a new copy with brand-new contents. + # TODO: actually push this copy current_share_peers = DictOfSets() reachable_peers = {} EPSILON = total_shares / 2 - partial_peerlist = itertools.islice(peerlist, total_shares + EPSILON) + partial_peerlist = islice(peerlist, total_shares + EPSILON) peer_storage_servers = {} dl = [] for (permutedid, peerid, conn) in partial_peerlist: