return (seqnum, root_hash, final_shares)
+ def _pack_checkstring(self, seqnum, root_hash):
+ return struct.pack(">BQ32s",
+ 0, # version,
+ seqnum,
+ root_hash)
+
def _pack_prefix(self, seqnum, root_hash,
required_shares, total_shares,
segment_size, data_length):
(peerid, seqnum, R) = oldplace
if seqnum >= new_seqnum:
raise UncoordinatedWriteError()
- target_map[shnum].add(oldplace)
+ target_map.add(shnum, oldplace)
shares_per_peer.add(peerid, shnum)
if shnum in shares_needing_homes:
shares_needing_homes.remove(shnum)
assert not shares_needing_homes
- return target_map
+ return (target_map, peer_storage_servers)
+
+ def _send_shares(self, (target_map, peer_storage_servers) ):
+ # we're finally ready to send out our shares. If we encounter any
+ # surprises here, it's because somebody else is writing at the same
+ # time. (Note: in the future, when we remove the _query_peers() step
+ # and instead speculate about [or remember] which shares are where,
+ # surprises here are *not* indications of UncoordinatedWriteError,
+ # and we'll need to respond to them more gracefully.
+
+ my_checkstring = self._pack_checkstring(self._new_seqnum,
+ self._new_root_hash)
+ peer_messages = {}
+ expected_old_shares = {}
+
+ for shnum, peers in target_map.items():
+ for (peerid, old_seqnum, old_root_hash) in peers:
+ testv = [(0, len(my_checkstring), "ge", my_checkstring)]
+ new_share = self._new_shares[shnum]
+ writev = [(0, new_share)]
+ if peerid not in peer_messages:
+ peer_messages[peerid] = {}
+ peer_messages[peerid][shnum] = (testv, writev, None)
+ if peerid not in expected_old_shares:
+ expected_old_shares[peerid] = {}
+ expected_old_shares[peerid][shnum] = (old_seqnum, old_root_hash)
+
+ read_vector = [(0, len(my_checkstring))]
+
+ dl = []
+ # ok, send the messages!
+ self._surprised = False
+ for peerid, tw_vectors in peer_messages.items():
+ d = self._do_testreadwrite(peerid, peer_storage_servers,
+ tw_vectors, read_vector)
+ d.addCallback(self._got_write_answer,
+ peerid, expected_old_shares[peerid])
+ dl.append(d)
+
+ d = defer.DeferredList(dl)
+ d.addCallback(lambda res: self._surprised)
+ return d
+
+ def _do_testreadwrite(self, peerid, peer_storage_servers,
+ tw_vectors, read_vector):
+ conn = peer_storage_servers[peerid]
+ storage_index = self._node._uri.storage_index
+ # TOTALLY BOGUS renew/cancel secrets
+ write_enabler = hashutil.tagged_hash("WEFOO", storage_index)
+ renew_secret = hashutil.tagged_hash("renewFOO", storage_index)
+ cancel_secret = hashutil.tagged_hash("cancelFOO", storage_index)
+
+ d = conn.callRemote("slot_testv_and_readv_and_writev",
+ storage_index,
+ (write_enabler, renew_secret, cancel_secret),
+ tw_vectors,
+ read_vector)
+ return d
+
+ def _got_write_answer(self, answer, peerid, expected_old_shares):
+ wrote, read_data = answer
+ surprised = False
+ if not wrote:
+ # surprise! our testv failed, so the write did not happen
+ surprised = True
+ for shnum, (old_checkstring,) in read_data.items():
+ if shnum not in expected_old_shares:
+ # surprise! there was a share we didn't know about
+ surprised = True
+ else:
+ seqnum, root_hash = expected_old_shares[shnum]
+ if seqnum is not None:
+ expected_checkstring = self._pack_checkstring(seqnum,
+ root_hash)
+ if old_checkstring != expected_checkstring:
+ # surprise! somebody modified the share
+ surprised = True
+ if surprised:
+ self._surprised = True
+
shares = self._peers[peerid]
return defer.succeed(shares)
+ def _do_testreadwrite(self, conn, peerid, tw_vectors, read_vector):
+ # always-pass: parrot the test vectors back to them.
+ readv = {}
+ for shnum, (testv, datav, new_length) in tw_vectors.items():
+ for (offset, length, op, specimen) in testv:
+ assert op in ("le", "eq", "ge")
+ readv[shnum] = [ specimen
+ for (offset, length, op, specimen)
+ in testv ]
+ answer = (True, readv)
+ return defer.succeed(answer)
+
class FakeNewDirectoryNode(dirnode2.NewDirectoryNode):
filenode_class = FakeFilenode
total_shares = 10
d = p._query_peers( (new_seqnum, new_root_hash, new_seqnum),
total_shares)
- def _done(target_map):
+ def _done( (target_map, peer_storage_servers) ):
shares_per_peer = {}
for shnum in target_map:
for (peerid, old_seqnum, old_R) in target_map[shnum]:
total_shares = 10
d = p._query_peers( (new_seqnum, new_root_hash, new_seqnum),
total_shares)
- def _done(target_map):
+ def _done( (target_map, peer_storage_servers) ):
shares_per_peer = {}
for shnum in target_map:
for (peerid, old_seqnum, old_R) in target_map[shnum]:
total_shares)
return d
+ def setup_for_write(self, num_peers, total_shares):
+ c, p = self.setup_for_sharemap(num_peers)
+ # make some fake shares
+ CONTENTS = "some initial contents"
+ shares_and_ids = ( ["%07d" % i for i in range(10)], range(10) )
+ d = defer.maybeDeferred(p._generate_shares,
+ (shares_and_ids,
+ 3, total_shares,
+ 21, # segsize
+ len(CONTENTS),
+ "IV"*8),
+ 3, # seqnum
+ FakePrivKey(), "encprivkey", FakePubKey(),
+ )
+ return d, p
+
+ def test_write(self):
+ total_shares = 10
+ d, p = self.setup_for_write(20, total_shares)
+ d.addCallback(p._query_peers, total_shares)
+ d.addCallback(p._send_shares)
+ def _done(surprised):
+ self.failIf(surprised, "surprised!")
+ d.addCallback(_done)
+ return d
+
+
class FakePubKey:
def serialize(self):