from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, \
UncoordinatedWriteError, NotEnoughServersError
from allmydata.mutable.servermap import ServerMap
-from allmydata.mutable.layout import unpack_checkstring, MDMFSlotWriteProxy, \
- SDMFSlotWriteProxy
+from allmydata.mutable.layout import get_version_from_checkstring,\
+ unpack_mdmf_checkstring, \
+ unpack_sdmf_checkstring, \
+ MDMFSlotWriteProxy, \
+ SDMFSlotWriteProxy, MDMFCHECKSTRING
KiB = 1024
DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
# existing servermap.
self.goal = set() # pairs of (peerid, shnum) tuples
- # the second table is our list of outstanding queries: those which
- # are in flight and may or may not be delivered, accepted, or
- # acknowledged. Items are added to this table when the request is
- # sent, and removed when the response returns (or errbacks).
- self.outstanding = set() # (peerid, shnum) tuples
+ # the number of outstanding queries: those that are in flight and
+ # may or may not be delivered, accepted, or acknowledged. This is
+ # incremented when a query is sent, and decremented when the response
+ # returns or errbacks.
+ self.num_outstanding = 0
# the third is a table of successes: share which have actually been
# placed. These are populated when responses come back with success.
# existing servermap.
self.goal = set() # pairs of (peerid, shnum) tuples
- # the second table is our list of outstanding queries: those which
- # are in flight and may or may not be delivered, accepted, or
- # acknowledged. Items are added to this table when the request is
- # sent, and removed when the response returns (or errbacks).
- self.outstanding = set() # (peerid, shnum) tuples
+ # the number of outstanding queries: those that are in flight and
+ # may or may not be delivered, accepted, or acknowledged. This is
+ # incremented when a query is sent, and decremented when the response
+ # returns or errbacks.
+ self.num_outstanding = 0
# the third is a table of successes: share which have actually been
# placed. These are populated when responses come back with success.
"%d messages outstanding" %
(len(self.placed),
len(self.goal),
- len(self.outstanding)))
+ self.num_outstanding))
self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
ds = []
verification_key = self._pubkey.serialize()
-
- # TODO: Bad, since we remove from this same dict. We need to
- # make a copy, or just use a non-iterated value.
- for (shnum, writer) in self.writers.iteritems():
+ for (shnum, writer) in self.writers.copy().iteritems():
writer.put_verification_key(verification_key)
+ self.num_outstanding += 1
+ def _no_longer_outstanding(res):
+ self.num_outstanding -= 1
+ return res
+
d = writer.finish_publishing()
- # Add the (peerid, shnum) tuple to our list of outstanding
- # queries. This gets used by _loop if some of our queries
- # fail to place shares.
- self.outstanding.add((writer.peerid, writer.shnum))
- d.addCallback(self._got_write_answer, writer, started)
+ d.addBoth(_no_longer_outstanding)
d.addErrback(self._connection_problem, writer)
+ d.addCallback(self._got_write_answer, writer, started)
ds.append(d)
self._record_verinfo()
self._status.timings['pack'] = time.time() - started
self.surprised = True
self.bad_peers.add(writer) # don't ask them again
# use the checkstring to add information to the log message
+ unknown_format = False
for (shnum,readv) in read_data.items():
checkstring = readv[0]
- (other_seqnum,
- other_roothash,
- other_salt) = unpack_checkstring(checkstring)
+ version = get_version_from_checkstring(checkstring)
+ if version == MDMF_VERSION:
+ (other_seqnum,
+ other_roothash) = unpack_mdmf_checkstring(checkstring)
+ elif version == SDMF_VERSION:
+ (other_seqnum,
+ other_roothash,
+ other_salt) = unpack_sdmf_checkstring(checkstring)
+ else:
+ unknown_format = True
expected_version = self._servermap.version_on_peer(peerid,
shnum)
if expected_version:
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = expected_version
- self.log("somebody modified the share on us:"
- " shnum=%d: I thought they had #%d:R=%s,"
- " but testv reported #%d:R=%s" %
- (shnum,
- seqnum, base32.b2a(root_hash)[:4],
- other_seqnum, base32.b2a(other_roothash)[:4]),
- parent=lp, level=log.NOISY)
+ msg = ("somebody modified the share on us:"
+ " shnum=%d: I thought they had #%d:R=%s," %
+ (shnum,
+ seqnum, base32.b2a(root_hash)[:4]))
+ if unknown_format:
+ msg += (" but I don't know how to read share"
+ " format %d" % version)
+ else:
+ msg += " but testv reported #%d:R=%s" % \
+ (other_seqnum, other_roothash)
+ self.log(msg, parent=lp, level=log.NOISY)
# if expected_version==None, then we didn't expect to see a
# share on that peer, and the 'surprise_shares' clause above
# will have logged it.