From: Brian Warner Date: Tue, 6 Nov 2007 04:29:47 +0000 (-0700) Subject: mutable: added send-messages-to-peers code, about 70% done. No recovery code yet. X-Git-Tag: allmydata-tahoe-0.7.0~285 X-Git-Url: https://git.rkrishnan.org/COPYING.GPL?a=commitdiff_plain;h=fade06ef4dc85becef6796d8d96f5fab4c7b8835;p=tahoe-lafs%2Ftahoe-lafs.git mutable: added send-messages-to-peers code, about 70% done. No recovery code yet. --- diff --git a/src/allmydata/mutable.py b/src/allmydata/mutable.py index d4fa1cbf..a77e3e98 100644 --- a/src/allmydata/mutable.py +++ b/src/allmydata/mutable.py @@ -307,6 +307,12 @@ class Publish(ShareFormattingMixin): return (seqnum, root_hash, final_shares) + def _pack_checkstring(self, seqnum, root_hash): + return struct.pack(">BQ32s", + 0, # version, + seqnum, + root_hash) + def _pack_prefix(self, seqnum, root_hash, required_shares, total_shares, segment_size, data_length): @@ -419,7 +425,7 @@ class Publish(ShareFormattingMixin): (peerid, seqnum, R) = oldplace if seqnum >= new_seqnum: raise UncoordinatedWriteError() - target_map[shnum].add(oldplace) + target_map.add(shnum, oldplace) shares_per_peer.add(peerid, shnum) if shnum in shares_needing_homes: shares_needing_homes.remove(shnum) @@ -444,4 +450,83 @@ class Publish(ShareFormattingMixin): assert not shares_needing_homes - return target_map + return (target_map, peer_storage_servers) + + def _send_shares(self, (target_map, peer_storage_servers) ): + # we're finally ready to send out our shares. If we encounter any + # surprises here, it's because somebody else is writing at the same + # time. (Note: in the future, when we remove the _query_peers() step + # and instead speculate about [or remember] which shares are where, + # surprises here are *not* indications of UncoordinatedWriteError, + # and we'll need to respond to them more gracefully. + + my_checkstring = self._pack_checkstring(self._new_seqnum, + self._new_root_hash) + peer_messages = {} + expected_old_shares = {} + + for shnum, peers in target_map.items(): + for (peerid, old_seqnum, old_root_hash) in peers: + testv = [(0, len(my_checkstring), "ge", my_checkstring)] + new_share = self._new_shares[shnum] + writev = [(0, new_share)] + if peerid not in peer_messages: + peer_messages[peerid] = {} + peer_messages[peerid][shnum] = (testv, writev, None) + if peerid not in expected_old_shares: + expected_old_shares[peerid] = {} + expected_old_shares[peerid][shnum] = (old_seqnum, old_root_hash) + + read_vector = [(0, len(my_checkstring))] + + dl = [] + # ok, send the messages! + self._surprised = False + for peerid, tw_vectors in peer_messages.items(): + d = self._do_testreadwrite(peerid, peer_storage_servers, + tw_vectors, read_vector) + d.addCallback(self._got_write_answer, + peerid, expected_old_shares[peerid]) + dl.append(d) + + d = defer.DeferredList(dl) + d.addCallback(lambda res: self._surprised) + return d + + def _do_testreadwrite(self, peerid, peer_storage_servers, + tw_vectors, read_vector): + conn = peer_storage_servers[peerid] + storage_index = self._node._uri.storage_index + # TOTALLY BOGUS renew/cancel secrets + write_enabler = hashutil.tagged_hash("WEFOO", storage_index) + renew_secret = hashutil.tagged_hash("renewFOO", storage_index) + cancel_secret = hashutil.tagged_hash("cancelFOO", storage_index) + + d = conn.callRemote("slot_testv_and_readv_and_writev", + storage_index, + (write_enabler, renew_secret, cancel_secret), + tw_vectors, + read_vector) + return d + + def _got_write_answer(self, answer, peerid, expected_old_shares): + wrote, read_data = answer + surprised = False + if not wrote: + # surprise! our testv failed, so the write did not happen + surprised = True + for shnum, (old_checkstring,) in read_data.items(): + if shnum not in expected_old_shares: + # surprise! there was a share we didn't know about + surprised = True + else: + seqnum, root_hash = expected_old_shares[shnum] + if seqnum is not None: + expected_checkstring = self._pack_checkstring(seqnum, + root_hash) + if old_checkstring != expected_checkstring: + # surprise! somebody modified the share + surprised = True + if surprised: + self._surprised = True + diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index e8cc516f..9acc1e68 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -72,6 +72,18 @@ class FakePublish(mutable.Publish): shares = self._peers[peerid] return defer.succeed(shares) + def _do_testreadwrite(self, conn, peerid, tw_vectors, read_vector): + # always-pass: parrot the test vectors back to them. + readv = {} + for shnum, (testv, datav, new_length) in tw_vectors.items(): + for (offset, length, op, specimen) in testv: + assert op in ("le", "eq", "ge") + readv[shnum] = [ specimen + for (offset, length, op, specimen) + in testv ] + answer = (True, readv) + return defer.succeed(answer) + class FakeNewDirectoryNode(dirnode2.NewDirectoryNode): filenode_class = FakeFilenode @@ -268,7 +280,7 @@ class Publish(unittest.TestCase): total_shares = 10 d = p._query_peers( (new_seqnum, new_root_hash, new_seqnum), total_shares) - def _done(target_map): + def _done( (target_map, peer_storage_servers) ): shares_per_peer = {} for shnum in target_map: for (peerid, old_seqnum, old_R) in target_map[shnum]: @@ -293,7 +305,7 @@ class Publish(unittest.TestCase): total_shares = 10 d = p._query_peers( (new_seqnum, new_root_hash, new_seqnum), total_shares) - def _done(target_map): + def _done( (target_map, peer_storage_servers) ): shares_per_peer = {} for shnum in target_map: for (peerid, old_seqnum, old_R) in target_map[shnum]: @@ -320,6 +332,33 @@ class Publish(unittest.TestCase): total_shares) return d + def setup_for_write(self, num_peers, total_shares): + c, p = self.setup_for_sharemap(num_peers) + # make some fake shares + CONTENTS = "some initial contents" + shares_and_ids = ( ["%07d" % i for i in range(10)], range(10) ) + d = defer.maybeDeferred(p._generate_shares, + (shares_and_ids, + 3, total_shares, + 21, # segsize + len(CONTENTS), + "IV"*8), + 3, # seqnum + FakePrivKey(), "encprivkey", FakePubKey(), + ) + return d, p + + def test_write(self): + total_shares = 10 + d, p = self.setup_for_write(20, total_shares) + d.addCallback(p._query_peers, total_shares) + d.addCallback(p._send_shares) + def _done(surprised): + self.failIf(surprised, "surprised!") + d.addCallback(_done) + return d + + class FakePubKey: def serialize(self):