initial_query_count = 5
- # we might not know how many shares we need yet.
- self._required_shares = self._node.get_required_shares()
- self._total_shares = self._node.get_total_shares()
-
# self._valid_versions is a dictionary in which the keys are
- # 'verinfo' tuples (seqnum, root_hash, IV). Every time we hear about
- # a new potential version of the file, we check its signature, and
- # the valid ones are added to this dictionary. The values of the
- # dictionary are (prefix, sharemap) tuples, where 'prefix' is just
- # the first part of the share (containing the serialized verinfo),
- # for easier comparison. 'sharemap' is a DictOfSets, in which the
- # keys are sharenumbers, and the values are sets of (peerid, data)
- # tuples. There is a (peerid, data) tuple for every instance of a
- # given share that we've seen. The 'data' in this tuple is a full
- # copy of the SDMF share, starting with the \x00 version byte and
- # continuing through the last byte of sharedata.
+ # 'verinfo' tuples (seqnum, root_hash, IV, segsize, datalength, k,
+ # N). Every time we hear about a new potential version of the file,
+ # we check its signature, and the valid ones are added to this
+ # dictionary. The values of the dictionary are (prefix, sharemap)
+ # tuples, where 'prefix' is just the first part of the share
+ # (containing the serialized verinfo), for easier comparison.
+ # 'sharemap' is a DictOfSets, in which the keys are sharenumbers, and
+ # the values are sets of (peerid, data) tuples. There is a (peerid,
+ # data) tuple for every instance of a given share that we've seen.
+ # The 'data' in this tuple is a full copy of the SDMF share, starting
+ # with the \x00 version byte and continuing through the last byte of
+ # sharedata.
self._valid_versions = {}
# self._valid_shares is a dict mapping (peerid,data) tuples to
self._pubkey = self._deserialize_pubkey(pubkey_s)
self._node._populate_pubkey(self._pubkey)
- verinfo = (seqnum, root_hash, IV, segsize, datalength) #, k, N)
+ verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N)
self._status.sharemap[peerid].add(verinfo)
if verinfo not in self._valid_versions:
k, N, segsize, datalength))
self._valid_versions[verinfo] = (prefix, DictOfSets())
- # and make a note of the other parameters we've just learned
- # NOTE: Retrieve needs to be refactored to put k,N in the verinfo
- # along with seqnum/etc, to make sure we don't co-mingle shares
- # from differently-encoded versions of the same file.
- if self._required_shares is None:
- self._required_shares = k
- self._node._populate_required_shares(k)
- if self._total_shares is None:
- self._total_shares = N
- self._node._populate_total_shares(N)
-
- # reject shares that don't match our narrow-minded ideas of what
- # encoding we're going to use. This addresses the immediate needs of
- # ticket #312, by turning the data corruption into unavailability. To
- # get back the availability (i.e. make sure that one weird-encoding
- # share that happens to come back first doesn't make us ignore the
- # rest of the shares), we need to implement the refactoring mentioned
- # above.
- if k != self._required_shares:
- self._status.problems[peerid] = "sh#%d: k=%d, we want %d" \
- % (shnum, k, self._required_shares)
- raise CorruptShareError(peerid, shnum,
- "share has k=%d, we want k=%d" %
- (k, self._required_shares))
-
- if N != self._total_shares:
- self._status.problems[peerid] = "sh#%d: N=%d, we want %d" \
- % (shnum, N, self._total_shares)
- raise CorruptShareError(peerid, shnum,
- "share has N=%d, we want N=%d" %
- (N, self._total_shares))
-
- # we've already seen this pair, and checked the signature so we
- # know it's a valid candidate. Accumulate the share info, if
- # there's enough data present. If not, raise NeedMoreDataError,
- # which will trigger a re-fetch.
+ # We now know that this is a valid candidate verinfo. Accumulate the
+ # share info, if there's enough data present. If not, raise
+ # NeedMoreDataError, which will trigger a re-fetch.
_ignored = unpack_share_data(data)
self.log(" found enough data to add share contents")
self._valid_versions[verinfo][1].add(shnum, (peerid, data))
return
share_prefixes = {}
versionmap = DictOfSets()
+ max_N = 0
for verinfo, (prefix, sharemap) in self._valid_versions.items():
# sharemap is a dict that maps shnums to sets of (peerid,data).
# len(sharemap) is the number of distinct shares that appear to
# be available.
- if len(sharemap) >= self._required_shares:
+ (seqnum, root_hash, IV, segsize, datalength, k, N) = verinfo
+ max_N = max(max_N, N)
+ if len(sharemap) >= k:
# this one looks retrievable. TODO: our policy of decoding
# the first version that we can get is a bit troublesome: in
# a small grid with a large expansion factor, a single
# no more queries are outstanding. Can we send out more? First,
# should we be looking at more peers?
- self.log("need more peers: N=%s, peerlist=%d peerlist_limit=%d" %
- (self._total_shares, len(self._peerlist),
+ self.log("need more peers: max_N=%s, peerlist=%d peerlist_limit=%d" %
+ (max_N, len(self._peerlist),
self._peerlist_limit), level=log.UNUSUAL)
- if self._total_shares is not None:
- search_distance = self._total_shares * 2
+ if max_N:
+ search_distance = max_N * 2
else:
search_distance = 20
self.log("search_distance=%d" % search_distance, level=log.UNUSUAL)
def _attempt_decode(self, verinfo, sharemap):
# sharemap is a dict which maps shnum to [(peerid,data)..] sets.
- (seqnum, root_hash, IV, segsize, datalength) = verinfo
+ (seqnum, root_hash, IV, segsize, datalength, k, N) = verinfo
- assert len(sharemap) >= self._required_shares, len(sharemap)
+ assert len(sharemap) >= k, len(sharemap)
shares_s = []
for shnum in sorted(sharemap.keys()):
# it's now down to doing FEC and decrypt.
elapsed = time.time() - self._started
self._status.timings["fetch"] = elapsed
- assert len(shares) >= self._required_shares, len(shares)
- d = defer.maybeDeferred(self._decode, shares, segsize, datalength)
+ assert len(shares) >= k, len(shares)
+ d = defer.maybeDeferred(self._decode, shares, segsize, datalength, k, N)
d.addCallback(self._decrypt, IV, seqnum, root_hash)
return d
self.log(" data valid! len=%d" % len(share_data))
return share_data
- def _decode(self, shares_dict, segsize, datalength):
- # we ought to know these values by now
- assert self._required_shares is not None
- assert self._total_shares is not None
+ def _decode(self, shares_dict, segsize, datalength, k, N):
# shares_dict is a dict mapping shnum to share data, but the codec
# wants two lists.
shareids.append(shareid)
shares.append(share)
- assert len(shareids) >= self._required_shares, len(shareids)
+ assert len(shareids) >= k, len(shareids)
# zfec really doesn't want extra shares
- shareids = shareids[:self._required_shares]
- shares = shares[:self._required_shares]
+ shareids = shareids[:k]
+ shares = shares[:k]
fec = codec.CRSDecoder()
- params = "%d-%d-%d" % (segsize,
- self._required_shares, self._total_shares)
+ params = "%d-%d-%d" % (segsize, k, N)
fec.set_serialized_params(params)
self.log("params %s, we have %d shares" % (params, len(shares)))
def _done(buffers):
elapsed = time.time() - started
self._status.timings["decode"] = elapsed
- self._status.set_encoding(self._required_shares, self._total_shares)
+ self._status.set_encoding(k, N)
+
+ # stash these in the MutableFileNode to speed up the next pass
+ self._node._populate_required_shares(k)
+ self._node._populate_total_shares(N)
+
self.log(" decode done, %d buffers" % len(buffers))
segment = "".join(buffers)
self.log(" joined length %d, datalength %d" %