num = self._node._client.log("Publish(%s): starting" % prefix)
self._log_number = num
- def log(self, msg, parent=None, **kwargs):
- prefix = self._log_prefix
- if parent is None:
- parent = self._log_number
- num = self._node._client.log("Publish(%s): %s" % (prefix, msg),
- parent=parent, **kwargs)
+ def log(self, *args, **kwargs):
+ if 'parent' not in kwargs:
+ kwargs['parent'] = self._log_number
+ num = log.msg(*args, **kwargs)
return num
- def log_err(self, f):
- num = log.err(f, parent=self._log_number)
+ def log_err(self, *args, **kwargs):
+ if 'parent' not in kwargs:
+ kwargs['parent'] = self._log_number
+ num = log.err(*args, **kwargs)
return num
def publish(self, newdata, wait_for_numpeers=None):
def _got_query_results(self, datavs, peerid, permutedid,
reachable_peers, current_share_peers):
- lp = self.log("_got_query_results from %s" %
- idlib.shortnodeid_b2a(peerid))
+ lp = self.log(format="_got_query_results from %(peerid)s",
+ peerid=idlib.shortnodeid_b2a(peerid))
assert isinstance(datavs, dict)
reachable_peers[peerid] = permutedid
if not datavs:
self.log("peer has no shares", parent=lp)
for shnum, datav in datavs.items():
- self.log("peer has shnum %d" % shnum, parent=lp)
+ lp2 = self.log("peer has shnum %d" % shnum, parent=lp)
assert len(datav) == 1
data = datav[0]
# We want (seqnum, root_hash, IV) from all servers to know what
# self._pubkey is present because we require read-before-replace
valid = self._pubkey.verify(prefix, signature)
if not valid:
- self.log("bad signature from %s shnum %d" %
- (shnum, idlib.shortnodeid_b2a(peerid)),
- parent=lp, level=log.WEIRD)
+ self.log(format="bad signature from %(peerid)s shnum %(shnum)d",
+ peerid=idlib.shortnodeid_b2a(peerid), shnum=shnum,
+ parent=lp2, level=log.WEIRD)
continue
+ self.log(format="peer has goodsig shnum %(shnum)d seqnum %(seqnum)d",
+ shnum=shnum, seqnum=seqnum,
+ parent=lp2, level=log.NOISY)
share = (shnum, seqnum, root_hash)
current_share_peers.add(shnum, (peerid, seqnum, root_hash) )
# TODO: 2: move those shares instead of copying them, to reduce future
# update work
+ # if log.recording_noisy
+ logmsg = []
+ for shnum in range(total_shares):
+ logmsg2 = []
+ for oldplace in current_share_peers.get(shnum, []):
+ (peerid, seqnum, R) = oldplace
+ logmsg2.append("%s:#%d:R=%s" % (idlib.shortnodeid_b2a(peerid),
+ seqnum, idlib.b2a(R)[:4]))
+ logmsg.append("sh%d on (%s)" % (shnum, "/".join(logmsg2)))
+ self.log("sharemap: %s" % (", ".join(logmsg)), level=log.NOISY)
+ self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
+ level=log.NOISY)
+
shares_needing_homes = range(total_shares)
target_map = DictOfSets() # maps shnum to set((peerid,oldseqnum,oldR))
shares_per_peer = DictOfSets()
for oldplace in current_share_peers.get(shnum, []):
(peerid, seqnum, R) = oldplace
if seqnum >= self._new_seqnum:
- raise UncoordinatedWriteError()
+ self.log("somebody has a newer sequence number than what we were uploading",
+ level=log.WEIRD)
+ self.log(format="peerid=%(peerid)s, theirs=%(seqnum)d, mine=%(new_seqnum)d",
+ peerid=idlib.shortnodeid_b2a(peerid),
+ seqnum=seqnum,
+ new_seqnum=self._new_seqnum)
+ raise UncoordinatedWriteError("somebody has a newer sequence number than us")
target_map.add(shnum, oldplace)
shares_per_peer.add(peerid, shnum)
if shnum in shares_needing_homes:
def _got_write_answer(self, answer, tw_vectors, my_checkstring,
peerid, expected_old_shares,
dispatch_map):
- self.log("_got_write_answer from %s" % idlib.shortnodeid_b2a(peerid))
+ lp = self.log("_got_write_answer from %s" %
+ idlib.shortnodeid_b2a(peerid))
wrote, read_data = answer
surprised = False
dispatch_map.add(shnum, (peerid, new_seqnum, new_root_hash))
else:
# surprise! our testv failed, so the write did not happen
+ self.log("our testv failed, that write did not happen",
+ parent=lp, level=log.WEIRD)
surprised = True
for shnum, (old_cs,) in read_data.items():
if shnum not in expected_old_shares:
# surprise! there was a share we didn't know about
+ self.log("they had share %d that we didn't know about" % shnum,
+ parent=lp, level=log.WEIRD)
surprised = True
else:
seqnum, root_hash = expected_old_shares[shnum]
if seqnum is not None:
if seqnum != old_seqnum or root_hash != old_root_hash:
# surprise! somebody modified the share on us
+ self.log("somebody modified the share on us:"
+ " shnum=%d: I thought they had #%d:R=%s,"
+ " but testv reported #%d:R=%s" %
+ (shnum,
+ seqnum, idlib.b2a(root_hash)[:4],
+ old_seqnum, idlib.b2a(old_root_hash)[:4]),
+ parent=lp, level=log.WEIRD)
surprised = True
if surprised:
self._surprised = True
seqnum,
idlib.b2a(root_hash)[:4])
for (peerid,seqnum,root_hash) in places]
- self.log(" share %d sent to: %s" % (shnum, sent_to))
+ self.log(" share %d sent to: %s" % (shnum, sent_to),
+ level=log.NOISY)
def _maybe_recover(self, (surprised, dispatch_map)):
- self.log("_maybe_recover, surprised=%s, dispatch_map:" % surprised)
+ self.log("_maybe_recover, surprised=%s, dispatch_map:" % surprised,
+ level=log.NOISY)
self._log_dispatch_map(dispatch_map)
if not surprised:
self.log(" no recovery needed")
return
+ self.log("We need recovery!", level=log.WEIRD)
print "RECOVERY NOT YET IMPLEMENTED"
# but dispatch_map will help us do it
raise UncoordinatedWriteError("I was surprised!")