From: Brian Warner Date: Tue, 11 Mar 2008 07:26:00 +0000 (-0700) Subject: mutable: tolerate multiple encodings, using whichever version is recoverable first... X-Git-Tag: allmydata-tahoe-0.9.0~21 X-Git-Url: https://git.rkrishnan.org/pf/content/en/seg/module-simplejson.encoder.html?a=commitdiff_plain;h=c727348d85188a8fcad0e80867129c56447050c3;p=tahoe-lafs%2Ftahoe-lafs.git mutable: tolerate multiple encodings, using whichever version is recoverable first. Closes #312 --- diff --git a/src/allmydata/mutable.py b/src/allmydata/mutable.py index e24372b8..b9792fde 100644 --- a/src/allmydata/mutable.py +++ b/src/allmydata/mutable.py @@ -387,22 +387,19 @@ class Retrieve: initial_query_count = 5 - # we might not know how many shares we need yet. - self._required_shares = self._node.get_required_shares() - self._total_shares = self._node.get_total_shares() - # self._valid_versions is a dictionary in which the keys are - # 'verinfo' tuples (seqnum, root_hash, IV). Every time we hear about - # a new potential version of the file, we check its signature, and - # the valid ones are added to this dictionary. The values of the - # dictionary are (prefix, sharemap) tuples, where 'prefix' is just - # the first part of the share (containing the serialized verinfo), - # for easier comparison. 'sharemap' is a DictOfSets, in which the - # keys are sharenumbers, and the values are sets of (peerid, data) - # tuples. There is a (peerid, data) tuple for every instance of a - # given share that we've seen. The 'data' in this tuple is a full - # copy of the SDMF share, starting with the \x00 version byte and - # continuing through the last byte of sharedata. + # 'verinfo' tuples (seqnum, root_hash, IV, segsize, datalength, k, + # N). Every time we hear about a new potential version of the file, + # we check its signature, and the valid ones are added to this + # dictionary. The values of the dictionary are (prefix, sharemap) + # tuples, where 'prefix' is just the first part of the share + # (containing the serialized verinfo), for easier comparison. + # 'sharemap' is a DictOfSets, in which the keys are sharenumbers, and + # the values are sets of (peerid, data) tuples. There is a (peerid, + # data) tuple for every instance of a given share that we've seen. + # The 'data' in this tuple is a full copy of the SDMF share, starting + # with the \x00 version byte and continuing through the last byte of + # sharedata. self._valid_versions = {} # self._valid_shares is a dict mapping (peerid,data) tuples to @@ -544,7 +541,7 @@ class Retrieve: self._pubkey = self._deserialize_pubkey(pubkey_s) self._node._populate_pubkey(self._pubkey) - verinfo = (seqnum, root_hash, IV, segsize, datalength) #, k, N) + verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N) self._status.sharemap[peerid].add(verinfo) if verinfo not in self._valid_versions: @@ -569,42 +566,9 @@ class Retrieve: k, N, segsize, datalength)) self._valid_versions[verinfo] = (prefix, DictOfSets()) - # and make a note of the other parameters we've just learned - # NOTE: Retrieve needs to be refactored to put k,N in the verinfo - # along with seqnum/etc, to make sure we don't co-mingle shares - # from differently-encoded versions of the same file. - if self._required_shares is None: - self._required_shares = k - self._node._populate_required_shares(k) - if self._total_shares is None: - self._total_shares = N - self._node._populate_total_shares(N) - - # reject shares that don't match our narrow-minded ideas of what - # encoding we're going to use. This addresses the immediate needs of - # ticket #312, by turning the data corruption into unavailability. To - # get back the availability (i.e. make sure that one weird-encoding - # share that happens to come back first doesn't make us ignore the - # rest of the shares), we need to implement the refactoring mentioned - # above. - if k != self._required_shares: - self._status.problems[peerid] = "sh#%d: k=%d, we want %d" \ - % (shnum, k, self._required_shares) - raise CorruptShareError(peerid, shnum, - "share has k=%d, we want k=%d" % - (k, self._required_shares)) - - if N != self._total_shares: - self._status.problems[peerid] = "sh#%d: N=%d, we want %d" \ - % (shnum, N, self._total_shares) - raise CorruptShareError(peerid, shnum, - "share has N=%d, we want N=%d" % - (N, self._total_shares)) - - # we've already seen this pair, and checked the signature so we - # know it's a valid candidate. Accumulate the share info, if - # there's enough data present. If not, raise NeedMoreDataError, - # which will trigger a re-fetch. + # We now know that this is a valid candidate verinfo. Accumulate the + # share info, if there's enough data present. If not, raise + # NeedMoreDataError, which will trigger a re-fetch. _ignored = unpack_share_data(data) self.log(" found enough data to add share contents") self._valid_versions[verinfo][1].add(shnum, (peerid, data)) @@ -625,11 +589,14 @@ class Retrieve: return share_prefixes = {} versionmap = DictOfSets() + max_N = 0 for verinfo, (prefix, sharemap) in self._valid_versions.items(): # sharemap is a dict that maps shnums to sets of (peerid,data). # len(sharemap) is the number of distinct shares that appear to # be available. - if len(sharemap) >= self._required_shares: + (seqnum, root_hash, IV, segsize, datalength, k, N) = verinfo + max_N = max(max_N, N) + if len(sharemap) >= k: # this one looks retrievable. TODO: our policy of decoding # the first version that we can get is a bit troublesome: in # a small grid with a large expansion factor, a single @@ -674,11 +641,11 @@ class Retrieve: # no more queries are outstanding. Can we send out more? First, # should we be looking at more peers? - self.log("need more peers: N=%s, peerlist=%d peerlist_limit=%d" % - (self._total_shares, len(self._peerlist), + self.log("need more peers: max_N=%s, peerlist=%d peerlist_limit=%d" % + (max_N, len(self._peerlist), self._peerlist_limit), level=log.UNUSUAL) - if self._total_shares is not None: - search_distance = self._total_shares * 2 + if max_N: + search_distance = max_N * 2 else: search_distance = 20 self.log("search_distance=%d" % search_distance, level=log.UNUSUAL) @@ -721,9 +688,9 @@ class Retrieve: def _attempt_decode(self, verinfo, sharemap): # sharemap is a dict which maps shnum to [(peerid,data)..] sets. - (seqnum, root_hash, IV, segsize, datalength) = verinfo + (seqnum, root_hash, IV, segsize, datalength, k, N) = verinfo - assert len(sharemap) >= self._required_shares, len(sharemap) + assert len(sharemap) >= k, len(sharemap) shares_s = [] for shnum in sorted(sharemap.keys()): @@ -784,8 +751,8 @@ class Retrieve: # it's now down to doing FEC and decrypt. elapsed = time.time() - self._started self._status.timings["fetch"] = elapsed - assert len(shares) >= self._required_shares, len(shares) - d = defer.maybeDeferred(self._decode, shares, segsize, datalength) + assert len(shares) >= k, len(shares) + d = defer.maybeDeferred(self._decode, shares, segsize, datalength, k, N) d.addCallback(self._decrypt, IV, seqnum, root_hash) return d @@ -818,10 +785,7 @@ class Retrieve: self.log(" data valid! len=%d" % len(share_data)) return share_data - def _decode(self, shares_dict, segsize, datalength): - # we ought to know these values by now - assert self._required_shares is not None - assert self._total_shares is not None + def _decode(self, shares_dict, segsize, datalength, k, N): # shares_dict is a dict mapping shnum to share data, but the codec # wants two lists. @@ -830,14 +794,13 @@ class Retrieve: shareids.append(shareid) shares.append(share) - assert len(shareids) >= self._required_shares, len(shareids) + assert len(shareids) >= k, len(shareids) # zfec really doesn't want extra shares - shareids = shareids[:self._required_shares] - shares = shares[:self._required_shares] + shareids = shareids[:k] + shares = shares[:k] fec = codec.CRSDecoder() - params = "%d-%d-%d" % (segsize, - self._required_shares, self._total_shares) + params = "%d-%d-%d" % (segsize, k, N) fec.set_serialized_params(params) self.log("params %s, we have %d shares" % (params, len(shares))) @@ -847,7 +810,12 @@ class Retrieve: def _done(buffers): elapsed = time.time() - started self._status.timings["decode"] = elapsed - self._status.set_encoding(self._required_shares, self._total_shares) + self._status.set_encoding(k, N) + + # stash these in the MutableFileNode to speed up the next pass + self._node._populate_required_shares(k) + self._node._populate_total_shares(N) + self.log(" decode done, %d buffers" % len(buffers)) segment = "".join(buffers) self.log(" joined length %d, datalength %d" % diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index f9380d66..1ebb0df4 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -830,10 +830,8 @@ class Roundtrip(unittest.TestCase): return r3.retrieve() d.addCallback(_retrieve) def _retrieved(new_contents): - ## the current specified behavior is "first version recoverable" - #self.failUnlessEqual(new_contents, contents1) - # the current behavior is "first version seen is sticky" - self.failUnlessEqual(new_contents, contents2) + # the current specified behavior is "first version recoverable" + self.failUnlessEqual(new_contents, contents1) d.addCallback(_retrieved) return d