-import os, struct, itertools
+import os, struct
+from itertools import islice
from zope.interface import implements
from twisted.internet import defer
+from twisted.python import failure
+from foolscap.eventual import eventually
from allmydata.interfaces import IMutableFileNode, IMutableFileURI
-from allmydata.util import hashutil, mathutil
+from allmydata.util import hashutil, mathutil, idlib
from allmydata.uri import WriteableSSKFileURI
from allmydata.Crypto.Cipher import AES
from allmydata import hashtree, codec
class UncoordinatedWriteError(Exception):
pass
+class CorruptShareError(Exception):
+ def __init__(self, peerid, shnum, reason):
+ self.peerid = peerid
+ self.shnum = shnum
+ self.reason = reason
+ def __repr__(self):
+ # TODO: in some places we use idlib.b2a, in others (foolscap) we use
+ # stdlib b32encode. Fix this discrepancy.
+ short_peerid = idlib.b2a(self.peerid)[:8]
+ return "<CorruptShareError peerid=%s shnum[%d]: %s" % (short_peerid,
+ self.shnum,
+ self.reason)
+
HEADER_LENGTH = struct.calcsize(">BQ32s BBQQ LLLLLQQ")
+def unpack_prefix_and_signature(data):
+ assert len(data) >= HEADER_LENGTH
+ o = {}
+ prefix = data[:struct.calcsize(">BQ32s BBQQ")]
+
+ (version,
+ seqnum,
+ root_hash,
+ k, N, segsize, datalen,
+ o['signature'],
+ o['share_hash_chain'],
+ o['block_hash_tree'],
+ o['IV'],
+ o['share_data'],
+ o['enc_privkey'],
+ o['EOF']) = struct.unpack(">BQ32s BBQQ LLLLLQQ",
+ data[:HEADER_LENGTH])
+
+ assert version == 0
+ if len(data) < o['share_hash_chain']:
+ raise NeedMoreDataError(o['share_hash_chain'])
+
+ pubkey_s = data[HEADER_LENGTH:o['signature']]
+ signature = data[o['signature']:o['share_hash_chain']]
+
+ return (seqnum, root_hash, k, N, segsize, datalen,
+ pubkey_s, signature, prefix)
+
def unpack_share(data):
assert len(data) >= HEADER_LENGTH
o = {}
return self._writekey
def get_readkey(self):
return self._readkey
+ def get_storage_index(self):
+ return self._storage_index
def get_privkey(self):
return self._privkey
def get_encprivkey(self):
class Retrieve:
def __init__(self, filenode):
self._node = filenode
+ self._contents = None
+ # if the filenode already has a copy of the pubkey, use it. Otherwise
+ # we'll grab a copy from the first peer we talk to.
+ self._pubkey = filenode.get_pubkey()
+ self._storage_index = filenode.get_storage_index()
+
+ def retrieve(self):
+ """Retrieve the filenode's current contents. Returns a Deferred that
+ fires with a string when the contents have been retrieved."""
+
+ # 1: make a guess as to how many peers we should send requests to. We
+ # want to hear from k+EPSILON (k because we have to, EPSILON extra
+ # because that helps us resist rollback attacks). [TRADEOFF:
+ # EPSILON>0 means extra work] [TODO: implement EPSILON>0]
+ # 2: build the permuted peerlist, taking the first k+E peers
+ # 3: send readv requests to all of them in parallel, asking for the
+ # first 2KB of data from all shares
+ # 4: when the first of the responses comes back, extract information:
+ # 4a: extract the pubkey, hash it, compare against the URI. If this
+ # check fails, log a WEIRD and ignore the peer.
+ # 4b: extract the prefix (seqnum, roothash, k, N, segsize, datalength)
+ # and verify the signature on it. If this is wrong, log a WEIRD
+ # and ignore the peer. Save the prefix string in a dict that's
+ # keyed by (seqnum,roothash) and has (prefixstring, sharemap) as
+ # values. We'll use the prefixstring again later to avoid doing
+ # multiple signature checks
+ # 4c: extract the share size (offset of the last byte of sharedata).
+ # if it is larger than 2k, send new readv requests to pull down
+ # the extra data
+ # 4d: if the extracted 'k' is more than we guessed, rebuild a larger
+ # permuted peerlist and send out more readv requests.
+ # 5: as additional responses come back, extract the prefix and compare
+ # against the ones we've already seen. If they match, add the
+ # peerid to the corresponing sharemap dict
+ # 6: [TRADEOFF]: if EPSILON==0, when we get k responses for the
+ # same (seqnum,roothash) key, attempt to reconstruct that data.
+ # if EPSILON>0, wait for k+EPSILON responses, then attempt to
+ # reconstruct the most popular version.. If we do not have enough
+ # shares and there are still requests outstanding, wait. If there
+ # are not still requests outstanding (todo: configurable), send
+ # more requests. Never send queries to more than 2*N servers. If
+ # we've run out of servers, fail.
+ # 7: if we discover corrupt shares during the reconstruction process,
+ # remove that share from the sharemap. and start step#6 again.
+
+ initial_query_count = 5
+ self._read_size = 2000
+
+ # we might not know how many shares we need yet.
+ self._required_shares = self._node.get_required_shares()
+ self._total_shares = self._node.get_total_shares()
+ self._segsize = None
+ self._datalength = None
+
+ d = defer.succeed(initial_query_count)
+ d.addCallback(self._choose_initial_peers)
+ d.addCallback(self._send_initial_requests)
+ d.addCallback(lambda res: self._contents)
+ return d
+
+ def _choose_initial_peers(self, numqueries):
+ n = self._node
+ full_peerlist = n._client.get_permuted_peers(self._storage_index,
+ include_myself=True)
+ # _peerlist is a list of (peerid,conn) tuples for peers that are
+ # worth talking too. This starts with the first numqueries in the
+ # permuted list. If that's not enough to get us a recoverable
+ # version, we expand this to include the first 2*total_shares peerids
+ # (assuming we learn what total_shares is from one of the first
+ # numqueries peers)
+ self._peerlist = [(p[1],p[2])
+ for p in islice(full_peerlist, numqueries)]
+ # _peerlist_limit is the query limit we used to build this list. If
+ # we later increase this limit, it may be useful to re-scan the
+ # permuted list.
+ self._peerlist_limit = numqueries
+ return self._peerlist
+
+ def _send_initial_requests(self, peerlist):
+ self._bad_peerids = set()
+ self._running = True
+ self._queries_outstanding = set()
+ self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
+ self._peer_storage_servers = {}
+ dl = []
+ for (permutedid, peerid, conn) in peerlist:
+ self._queries_outstanding.add(peerid)
+ self._do_query(conn, peerid, self._storage_index, self._read_size,
+ self._peer_storage_servers)
+
+ # control flow beyond this point: state machine. Receiving responses
+ # from queries is the input. We might send out more queries, or we
+ # might produce a result.
+
+ d = self._done_deferred = defer.Deferred()
+ return d
+
+ def _do_query(self, conn, peerid, storage_index, readsize,
+ peer_storage_servers):
+ self._queries_outstanding.add(peerid)
+ if peerid in peer_storage_servers:
+ d = defer.succeed(peer_storage_servers[peerid])
+ else:
+ d = conn.callRemote("get_service", "storageserver")
+ def _got_storageserver(ss):
+ peer_storage_servers[peerid] = ss
+ return ss
+ d.addCallback(_got_storageserver)
+ d.addCallback(lambda ss: ss.callRemote("readv_slots", [(0, readsize)]))
+ d.addCallback(self._got_results, peerid, readsize)
+ d.addErrback(self._query_failed, peerid, (conn, storage_index,
+ peer_storage_servers))
+ return d
+
+ def _deserialize_pubkey(self, pubkey_s):
+ # TODO
+ return None
+
+ def _got_results(self, datavs, peerid, readsize):
+ self._queries_outstanding.discard(peerid)
+ self._used_peers.add(peerid)
+ if not self._running:
+ return
+
+ for shnum,datav in datavs.items():
+ data = datav[0]
+ (seqnum, root_hash, k, N, segsize, datalength,
+ pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
+
+ if not self._pubkey:
+ fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
+ if fingerprint != self._node._fingerprint:
+ # bad share
+ raise CorruptShareError(peerid,
+ "pubkey doesn't match fingerprint")
+ self._pubkey = self._deserialize_pubkey(pubkey_s)
+
+ verinfo = (seqnum, root_hash)
+ if verinfo not in self._valid_versions:
+ # it's a new pair. Verify the signature.
+ valid = self._pubkey.verify(prefix, signature)
+ if not valid:
+ raise CorruptShareError(peerid,
+ "signature is invalid")
+ # ok, it's a valid verinfo. Add it to the list of validated
+ # versions.
+ self._valid_versions[verinfo] = (prefix, DictOfSets())
+
+ # and make a note of the other parameters we've just learned
+ if self._required_shares is None:
+ self._required_shares = k
+ if self._total_shares is None:
+ self._total_shares = N
+ if self._segsize is None:
+ self._segsize = segsize
+ if self._datalength is None:
+ self._datalength = datalength
+
+ # we've already seen this pair, and checked the signature so we
+ # know it's a valid candidate. Accumulate the share info, if
+ # there's enough data present. If not, raise NeedMoreDataError,
+ # which will trigger a re-fetch.
+ _ignored = unpack_share(data)
+ self._valid_versions[verinfo][1].add(shnum, (peerid, data))
+
+ self._check_for_done()
+
+
+ def _query_failed(self, f, peerid, stuff):
+ self._queries_outstanding.discard(peerid)
+ self._used_peers.add(peerid)
+ if not self._running:
+ return
+ if f.check(NeedMoreDataError):
+ # ah, just re-send the query then.
+ self._read_size = max(self._read_size, f.needed_bytes)
+ (conn, storage_index, peer_storage_servers) = stuff
+ self._do_query(conn, peerid, storage_index, self._read_size,
+ peer_storage_servers)
+ return
+ self._bad_peerids.add(peerid)
+ short_sid = idlib.a2b(self.storage_index)[:6]
+ if f.check(CorruptShareError):
+ self._node._client.log("WEIRD: bad share for %s: %s" %
+ (short_sid, f))
+ else:
+ self._node._client.log("WEIRD: other error for %s: %s" %
+ (short_sid, f))
+ self._check_for_done()
+
+ def _check_for_done(self):
+ share_prefixes = {}
+ versionmap = DictOfSets()
+ for prefix, sharemap in self._valid_versions.values():
+ if len(sharemap) >= self._required_shares:
+ # this one looks retrievable
+ try:
+ contents = self._extract_data(sharemap)
+ except CorruptShareError:
+ # log(WEIRD)
+ # _extract_data is responsible for removing the bad
+ # share, so we can just try again
+ return self._check_for_done()
+ # success!
+ return self._done(contents)
+ # we don't have enough shares yet. Should we send out more queries?
+ if self._queries_outstanding:
+ # there are some running, so just wait for them to come back.
+ # TODO: if our initial guess at k was too low, waiting for these
+ # responses before sending new queries will increase our latency,
+ # so we could speed things up by sending new requests earlier.
+ return
+
+ # no more queries are outstanding. Can we send out more? First,
+ # should we be looking at more peers?
+ if self._total_shares is not None:
+ search_distance = self._total_shares * 2
+ else:
+ search_distance = 20
+ if self._peerlist_limit < search_distance:
+ # we might be able to get some more peers from the list
+ peers = self._node._client.get_permuted_peers(self._storage_index,
+ include_myself=True)
+ self._peerlist = [(p[1],p[2])
+ for p in islice(peers, search_distance)]
+ self._peerlist_limit = search_distance
+ # are there any peers on the list that we haven't used?
+ new_query_peers = []
+ for (peerid, conn) in self._peerlist:
+ if peerid not in self._used_peers:
+ new_query_peers.append( (peerid, conn) )
+ if len(new_query_peers) > 5:
+ # only query in batches of 5. TODO: this is pretty
+ # arbitrary, really I want this to be something like
+ # k - max(known_version_sharecounts) + some extra
+ break
+ if new_query_peers:
+ for (peerid, conn) in new_query_peers:
+ self._do_query(conn, peerid,
+ self._storage_index, self._read_size,
+ self._peer_storage_servers)
+ # we'll retrigger when those queries come back
+ return
+
+ # we've used up all the peers we're allowed to search. Failure.
+ return self._done(failure.Failure(NotEnoughPeersError()))
+
+ def _done(self, contents):
+ self._running = False
+ eventually(self._done_deferred.callback, contents)
+
+
class DictOfSets(dict):
def add(self, key, value):
self._new_root_hash = root_hash
self._new_shares = final_shares
- storage_index = self._node._uri.storage_index
+ storage_index = self._node.get_storage_index()
peerlist = self._node._client.get_permuted_peers(storage_index,
include_myself=False)
# we don't include ourselves in the N peers, but we *do* push an
# the signing key around later. This way, even if all the servers die
# and the directory contents are unrecoverable, at least we can still
# push out a new copy with brand-new contents.
+ # TODO: actually push this copy
current_share_peers = DictOfSets()
reachable_peers = {}
EPSILON = total_shares / 2
- partial_peerlist = itertools.islice(peerlist, total_shares + EPSILON)
+ partial_peerlist = islice(peerlist, total_shares + EPSILON)
peer_storage_servers = {}
dl = []
for (permutedid, peerid, conn) in partial_peerlist: