self._pubkey = filenode.get_pubkey()
self._storage_index = filenode.get_storage_index()
self._readkey = filenode.get_readkey()
+ self._last_failure = None
def log(self, msg):
- self._node._client.log(msg)
+ #self._node._client.log(msg)
+ pass
def retrieve(self):
"""Retrieve the filenode's current contents. Returns a Deferred that
self._segsize = None
self._datalength = None
+ # self._valid_versions is a dictionary in which the keys are
+ # 'verinfo' tuples (seqnum, root_hash, IV). Every time we hear about
+ # a new potential version of the file, we check its signature, and
+ # the valid ones are added to this dictionary. The values of the
+ # dictionary are (prefix, sharemap) tuples, where 'prefix' is just
+ # the first part of the share (containing the serialized verinfo),
+ # for easier comparison. 'sharemap' is a DictOfSets, in which the
+ # keys are sharenumbers, and the values are sets of (peerid, data)
+ # tuples. There is a (peerid, data) tuple for every instance of a
+ # given share that we've seen. The 'data' in this tuple is a full
+ # copy of the SDMF share, starting with the \x00 version byte and
+ # continuing through the last byte of sharedata.
+ self._valid_versions = {}
+
+ # self._valid_shares is a set (peerid,data) tuples. Each time we
+ # examine the hash chains inside a share and validate them against a
+ # signed root_hash, we add the share to self._valid_shares . We use
+ # this to avoid re-checking the hashes over and over again.
+ self._valid_shares = set()
+
+ self._done_deferred = defer.Deferred()
+
d = defer.succeed(initial_query_count)
d.addCallback(self._choose_initial_peers)
d.addCallback(self._send_initial_requests)
- d.addCallback(lambda res: self._contents)
+ d.addCallback(self._wait_for_finish)
return d
+ def _wait_for_finish(self, res):
+ return self._done_deferred
+
def _choose_initial_peers(self, numqueries):
n = self._node
full_peerlist = n._client.get_permuted_peers(self._storage_index,
self._bad_peerids = set()
self._running = True
self._queries_outstanding = set()
+ self._used_peers = set()
self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
self._peer_storage_servers = {}
dl = []
- for (permutedid, peerid, conn) in peerlist:
+ for (peerid, conn) in peerlist:
self._queries_outstanding.add(peerid)
self._do_query(conn, peerid, self._storage_index, self._read_size,
self._peer_storage_servers)
# control flow beyond this point: state machine. Receiving responses
# from queries is the input. We might send out more queries, or we
# might produce a result.
-
- d = self._done_deferred = defer.Deferred()
- return d
+ return None
def _do_query(self, conn, peerid, storage_index, readsize,
peer_storage_servers):
def _deserialize_pubkey(self, pubkey_s):
# TODO
+ from allmydata.test.test_mutable import FakePubKey
+ return FakePubKey(0)
return None
- def _validate_share(self, root_hash, shnum, data):
- if False:
- raise CorruptShareError("explanation")
- pass
-
def _got_results(self, datavs, peerid, readsize):
self._queries_outstanding.discard(peerid)
self._used_peers.add(peerid)
self._do_query(conn, peerid, storage_index, self._read_size,
peer_storage_servers)
return
+ self._last_failure = f
self._bad_peerids.add(peerid)
- short_sid = idlib.a2b(self.storage_index)[:6]
+ short_sid = idlib.b2a(self._storage_index)[:6]
if f.check(CorruptShareError):
self.log("WEIRD: bad share for %s: %s" % (short_sid, f))
else:
share_prefixes = {}
versionmap = DictOfSets()
for verinfo, (prefix, sharemap) in self._valid_versions.items():
+ # sharemap is a dict that maps shnums to sets of (peerid,data).
+ # len(sharemap) is the number of distinct shares that appear to
+ # be available.
if len(sharemap) >= self._required_shares:
# this one looks retrievable
- d = defer.maybeDeferred(self._extract_data, verinfo, sharemap)
+ d = defer.maybeDeferred(self._attempt_decode, verinfo, sharemap)
def _problem(f):
+ self._last_failure = f
if f.check(CorruptShareError):
# log(WEIRD)
- # _extract_data is responsible for removing the bad
+ # _attempt_decode is responsible for removing the bad
# share, so we can just try again
eventually(self._check_for_done)
return
return
# we've used up all the peers we're allowed to search. Failure.
- return self._done(failure.Failure(NotEnoughPeersError()))
+ e = NotEnoughPeersError("last failure: %s" % self._last_failure)
+ return self._done(failure.Failure(e))
- def _extract_data(self, verinfo, sharemap):
+ def _attempt_decode(self, verinfo, sharemap):
# sharemap is a dict which maps shnum to [(peerid,data)..] sets.
(seqnum, root_hash, IV) = verinfo
# first, validate each share that we haven't validated yet. We use
# self._valid_shares to remember which ones we've already checked.
- self._valid_shares = set() # set of (peerid,data) sets
shares = {}
- for shnum, shareinfo in sharemap.items():
- if shareinfo not in self._valid_shares:
- (peerid,data) = shareinfo
- try:
- # The (seqnum+root_hash+IV) tuple for this share was
- # already verified: specifically, all shares in the
- # sharemap have a (seqnum+root_hash+IV) pair that was
- # present in a validly signed prefix. The remainder of
- # the prefix for this particular share has *not* been
- # validated, but we don't care since we don't use it.
- # self._validate_share() is required to check the hashes
- # on the share data (and hash chains) to make sure they
- # match root_hash, but is not required (and is in fact
- # prohibited, because we don't validate the prefix on all
- # shares) from using anything else in the share.
- sharedata = self._validate_share(root_hash, shnum, data)
- except CorruptShareError, e:
- self.log("WEIRD: share was corrupt: %s" % e)
- sharemap[shnum].discard(shareinfo)
- # If there are enough remaining shares, _check_for_done()
- # will try again
- raise
- self._valid_shares.add(shareinfo)
- shares[shnum] = sharedata
+ for shnum, shareinfos in sharemap.items():
+ for shareinfo in shareinfos:
+ if shareinfo not in self._valid_shares:
+ (peerid,data) = shareinfo
+ try:
+ # The (seqnum+root_hash+IV) tuple for this share was
+ # already verified: specifically, all shares in the
+ # sharemap have a (seqnum+root_hash+IV) pair that was
+ # present in a validly signed prefix. The remainder
+ # of the prefix for this particular share has *not*
+ # been validated, but we don't care since we don't
+ # use it. self._validate_share() is required to check
+ # the hashes on the share data (and hash chains) to
+ # make sure they match root_hash, but is not required
+ # (and is in fact prohibited, because we don't
+ # validate the prefix on all shares) from using
+ # anything else in the share.
+ validator = self._validate_share_and_extract_data
+ sharedata = validator(root_hash, shnum, data)
+ assert isinstance(sharedata, str)
+ except CorruptShareError, e:
+ self.log("WEIRD: share was corrupt: %s" % e)
+ sharemap[shnum].discard(shareinfo)
+ # If there are enough remaining shares,
+ # _check_for_done() will try again
+ raise
+ self._valid_shares.add(shareinfo)
+ shares[shnum] = sharedata
# at this point, all shares in the sharemap are valid, and they're
# all for the same seqnum+root_hash version, so it's now down to
# doing FEC and decrypt.
d.addCallback(self._decrypt, IV)
return d
+ def _validate_share_and_extract_data(self, root_hash, shnum, data):
+ # 'data' is the whole SMDF share
+ self.log("_validate_share_and_extract_data[%d]" % shnum)
+ assert data[0] == "\x00"
+ pieces = unpack_share(data)
+ (seqnum, root_hash, IV, k, N, segsize, datalen,
+ pubkey, signature, share_hash_chain, block_hash_tree,
+ share_data, enc_privkey) = pieces
+
+ assert isinstance(share_data, str)
+ # build the block hash tree. SDMF has only one leaf.
+ leaves = [hashutil.block_hash(share_data)]
+ t = hashtree.HashTree(leaves)
+ if list(t) != block_hash_tree:
+ raise CorruptShareError("block hash tree failure")
+ share_hash_leaf = t[0]
+ # t2 = hashtree.IncompleteHashTree()
+ # TODO: use shnum, share_hash_leaf, share_hash_chain to compare against
+ # root_hash
+ #if False:
+ # raise CorruptShareError("explanation")
+ self.log(" data valid! len=%d" % len(share_data))
+ return share_data
+
def _decode(self, shares_dict):
+ # we ought to know these values by now
+ assert self._segsize is not None
+ assert self._required_shares is not None
+ assert self._total_shares is not None
+
# shares_dict is a dict mapping shnum to share data, but the codec
# wants two lists.
shareids = []; shares = []
shareids.append(shareid)
shares.append(share)
+ # zfec really doesn't want extra shares
+ shareids = shareids[:self._required_shares]
+ shares = shares[:self._required_shares]
+
fec = codec.CRSDecoder()
- # we ought to know these values by now
- assert self._segsize is not None
- assert self._required_shares is not None
- assert self._total_shares is not None
params = "%d-%d-%d" % (self._segsize,
self._required_shares, self._total_shares)
fec.set_serialized_params(params)
- d = fec.decode(shares, shareids)
+ self.log("params %s, we have %d shares" % (params, len(shares)))
+ self.log("about to decode, shareids=%s" % (shareids,))
+ d = defer.maybeDeferred(fec.decode, shares, shareids)
def _done(buffers):
+ self.log(" decode done, %d buffers" % len(buffers))
segment = "".join(buffers)
segment = segment[:self._datalength]
+ self.log(" segment len=%d" % len(segment))
return segment
+ def _err(f):
+ self.log(" decode failed: %s" % f)
+ return f
d.addCallback(_done)
+ d.addErrback(_err)
return d
def _decrypt(self, crypttext, IV):
return plaintext
def _done(self, contents):
+ self.log("DONE, contents: %r" % contents)
self._running = False
eventually(self._done_deferred.callback, contents)
def __init__(self, filenode):
self._node = filenode
+ def log(self, msg):
+ prefix = idlib.b2a(self._node.get_storage_index())[:6]
+ #self._node._client.log("%s: %s" % (prefix, msg))
+
def publish(self, newdata):
"""Publish the filenode's current contents. Returns a Deferred that
fires (with None) when the publish has done as much work as it's ever
# 4a: may need to run recovery algorithm
# 5: when enough responses are back, we're done
+ self.log("starting publish")
+
old_roothash = self._node._current_roothash
old_seqnum = self._node._current_seqnum
def _encrypt_and_encode(self, newdata, readkey, IV,
required_shares, total_shares):
+ self.log("_encrypt_and_encode")
+
key = hashutil.ssk_readkey_data_hash(IV, readkey)
enc = AES.new(key=key, mode=AES.MODE_CTR, counterstart="\x00"*16)
crypttext = enc.encrypt(newdata)
required_shares, total_shares,
segment_size, data_length, IV),
seqnum, privkey, encprivkey, pubkey):
+ self.log("_generate_shares")
(shares, share_ids) = shares_and_shareids
def _query_peers(self, (seqnum, root_hash, final_shares), total_shares):
+ self.log("_query_peers")
+
self._new_seqnum = seqnum
self._new_root_hash = root_hash
self._new_shares = final_shares
def _got_query_results(self, datavs, peerid, permutedid,
reachable_peers, current_share_peers):
+ self.log("_got_query_results")
+
assert isinstance(datavs, dict)
reachable_peers[peerid] = permutedid
for shnum, datav in datavs.items():
def _got_all_query_results(self, res,
total_shares, reachable_peers, new_seqnum,
current_share_peers, peer_storage_servers):
+ self.log("_got_all_query_results")
# now that we know everything about the shares currently out there,
# decide where to place the new shares.
return (target_map, peer_storage_servers)
def _send_shares(self, (target_map, peer_storage_servers), IV ):
+ self.log("_send_shares")
# we're finally ready to send out our shares. If we encounter any
# surprises here, it's because somebody else is writing at the same
# time. (Note: in the future, when we remove the _query_peers() step
def _got_write_answer(self, answer, tw_vectors, my_checkstring,
peerid, expected_old_shares,
dispatch_map):
+ self.log("_got_write_answer: %r" % (answer,))
wrote, read_data = answer
surprised = False
self._surprised = True
def _maybe_recover(self, (surprised, dispatch_map)):
+ self.log("_maybe_recover")
if not surprised:
return
print "RECOVERY NOT YET IMPLEMENTED"
self._writekey = self._uri.writekey
self._readkey = self._uri.readkey
self._storage_index = self._uri.storage_index
+ self._fingerprint = self._uri.fingerprint
return self
def create(self, initial_contents):
raise NotImplementedError
def download_to_data(self):
- #downloader = self._client.getServiceNamed("downloader")
- #return downloader.download_to_data(self.uri)
- return defer.succeed("this isn't going to fool you, is it")
+ r = Retrieve(self)
+ return r.retrieve()
def replace(self, newdata):
return defer.succeed(None)