From: Brian Warner Date: Sat, 19 Apr 2008 02:55:12 +0000 (-0700) Subject: mutable: improve test coverage, fix bug in privkey fetching, add .finished to stats... X-Git-Tag: allmydata-tahoe-1.1.0~214 X-Git-Url: https://git.rkrishnan.org/module-simplejson.decoder.html?a=commitdiff_plain;h=09dcfeae22062d452ff1f4bab34ecff794ff2eac;p=tahoe-lafs%2Ftahoe-lafs.git mutable: improve test coverage, fix bug in privkey fetching, add .finished to stats, remove dead code --- diff --git a/src/allmydata/mutable/node.py b/src/allmydata/mutable/node.py index 8ca68eb2..63f83e4f 100644 --- a/src/allmydata/mutable/node.py +++ b/src/allmydata/mutable/node.py @@ -61,10 +61,6 @@ class MutableFileNode: self._sharemap = {} # known shares, shnum-to-[nodeids] self._cache = ResponseCache() - self._current_data = None # SDMF: we're allowed to cache the contents - self._current_roothash = None # ditto - self._current_seqnum = None # ditto - # all users of this MutableFileNode go through the serializer. This # takes advantage of the fact that Deferreds discard the callbacks # that they're done with, so we can keep using the same Deferred @@ -118,10 +114,6 @@ class MutableFileNode: self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint) self._readkey = self._uri.readkey self._storage_index = self._uri.storage_index - # TODO: seqnum/roothash: really we mean "doesn't matter since - # nobody knows about us yet" - self._current_seqnum = 0 - self._current_roothash = "\x00"*32 return self._upload(initial_contents, None) d.addCallback(_generated) return d @@ -157,10 +149,6 @@ class MutableFileNode: self._required_shares = required_shares def _populate_total_shares(self, total_shares): self._total_shares = total_shares - def _populate_seqnum(self, seqnum): - self._current_seqnum = seqnum - def _populate_root_hash(self, root_hash): - self._current_roothash = root_hash def _populate_privkey(self, privkey): self._privkey = privkey diff --git a/src/allmydata/mutable/servermap.py b/src/allmydata/mutable/servermap.py index 8d469708..25aa29f4 100644 --- a/src/allmydata/mutable/servermap.py +++ b/src/allmydata/mutable/servermap.py @@ -30,15 +30,18 @@ class UpdateStatus: self.progress = 0.0 self.counter = self.statusid_counter.next() self.started = time.time() + self.finished = None - def add_per_server_time(self, peerid, op, elapsed): - assert op in ("query", "privkey") + def add_per_server_time(self, peerid, op, sent, elapsed): + assert op in ("query", "late", "privkey") if peerid not in self.timings["per_server"]: self.timings["per_server"][peerid] = [] - self.timings["per_server"][peerid].append((op,elapsed)) + self.timings["per_server"][peerid].append((op,sent,elapsed)) def get_started(self): return self.started + def get_finished(self): + return self.finished def get_storage_index(self): return self.storage_index def get_mode(self): @@ -72,6 +75,8 @@ class UpdateStatus: self.progress = value def set_active(self, value): self.active = value + def set_finished(self, when): + self.finished = when class ServerMap: """I record the placement of mutable shares. @@ -140,6 +145,10 @@ class ServerMap: (idlib.shortnodeid_b2a(peerid), shnum, seqnum, base32.b2a(root_hash)[:4], k, N, datalength)) + if self.problems: + print >>out, "%d PROBLEMS" % len(self.problems) + for f in self.problems: + print >>out, str(f) return out def all_peers(self): @@ -174,7 +183,6 @@ class ServerMap: (verinfo, timestamp) = self.servermap[key] return verinfo return None - return None def shares_available(self): """Return a dict that maps verinfo to tuples of @@ -254,14 +262,38 @@ class ServerMap: # Return a dict of versionid -> health, for versions that are # unrecoverable and have later seqnums than any recoverable versions. # These indicate that a write will lose data. - pass + versionmap = self.make_versionmap() + healths = {} # maps verinfo to (found,k) + unrecoverable = set() + highest_recoverable_seqnum = -1 + for (verinfo, shares) in versionmap.items(): + (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, + offsets_tuple) = verinfo + shnums = set([shnum for (shnum, peerid, timestamp) in shares]) + healths[verinfo] = (len(shnums),k) + if len(shnums) < k: + unrecoverable.add(verinfo) + else: + highest_recoverable_seqnum = max(seqnum, + highest_recoverable_seqnum) + + newversions = {} + for verinfo in unrecoverable: + (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, + offsets_tuple) = verinfo + if seqnum > highest_recoverable_seqnum: + newversions[verinfo] = healths[verinfo] + + return newversions + def needs_merge(self): # return True if there are multiple recoverable versions with the # same seqnum, meaning that MutableFileNode.read_best_version is not # giving you the whole story, and that using its data to do a # subsequent publish will lose information. - pass + return bool(len(self.recoverable_versions()) > 1) + class ServermapUpdater: def __init__(self, filenode, servermap, mode=MODE_READ): @@ -457,13 +489,14 @@ class ServermapUpdater: level=log.NOISY) now = time.time() elapsed = now - started - self._status.add_per_server_time(peerid, "query", elapsed) self._queries_outstanding.discard(peerid) self._must_query.discard(peerid) self._queries_completed += 1 if not self._running: self.log("but we're not running, so we'll ignore it", parent=lp) + self._status.add_per_server_time(peerid, "late", started, elapsed) return + self._status.add_per_server_time(peerid, "query", started, elapsed) if datavs: self._good_peers.add(peerid) @@ -602,7 +635,7 @@ class ServermapUpdater: pubkey, signature, share_hash_chain, block_hash_tree, share_data, enc_privkey) = r - return self._try_to_validate_privkey(self, enc_privkey, peerid, shnum) + return self._try_to_validate_privkey(enc_privkey, peerid, shnum) def _try_to_validate_privkey(self, enc_privkey, peerid, shnum): @@ -638,7 +671,7 @@ class ServermapUpdater: def _got_privkey_results(self, datavs, peerid, shnum, started): now = time.time() elapsed = now - started - self._status.add_per_server_time(peerid, "privkey", elapsed) + self._status.add_per_server_time(peerid, "privkey", started, elapsed) self._queries_outstanding.discard(peerid) if not self._need_privkey: return @@ -853,7 +886,9 @@ class ServermapUpdater: if not self._running: return self._running = False - elapsed = time.time() - self._started + now = time.time() + elapsed = now - self._started + self._status.set_finished(now) self._status.timings["total"] = elapsed self._status.set_progress(1.0) self._status.set_status("Done") diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index dd41997a..b3d78815 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -575,11 +575,14 @@ class Servermap(unittest.TestCase): d = self._client.create_mutable_file("New contents go here") def _created(node): self._fn = node + self._fn2 = self._client.create_node_from_uri(node.get_uri()) d.addCallback(_created) return d - def make_servermap(self, mode=MODE_CHECK): - smu = ServermapUpdater(self._fn, ServerMap(), mode) + def make_servermap(self, mode=MODE_CHECK, fn=None): + if fn is None: + fn = self._fn + smu = ServermapUpdater(fn, ServerMap(), mode) d = smu.update() return d @@ -621,6 +624,7 @@ class Servermap(unittest.TestCase): d.addCallback(lambda sm: us(sm, mode=MODE_READ)) d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6)) d.addCallback(lambda sm: us(sm, mode=MODE_WRITE)) + d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10)) d.addCallback(lambda sm: us(sm, mode=MODE_CHECK)) d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10)) d.addCallback(lambda sm: us(sm, mode=MODE_ANYTHING)) @@ -628,6 +632,25 @@ class Servermap(unittest.TestCase): return d + def test_fetch_privkey(self): + d = defer.succeed(None) + # use the sibling filenode (which hasn't been used yet), and make + # sure it can fetch the privkey. The file is small, so the privkey + # will be fetched on the first (query) pass. + d.addCallback(lambda res: self.make_servermap(MODE_WRITE, self._fn2)) + d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10)) + + # create a new file, which is large enough to knock the privkey out + # of the early part of the fil + LARGE = "These are Larger contents" * 200 # about 5KB + d.addCallback(lambda res: self._client.create_mutable_file(LARGE)) + def _created(large_fn): + large_fn2 = self._client.create_node_from_uri(large_fn.get_uri()) + return self.make_servermap(MODE_WRITE, large_fn2) + d.addCallback(_created) + d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10)) + return d + def test_mark_bad(self): d = defer.succeed(None) ms = self.make_servermap @@ -696,6 +719,7 @@ class Servermap(unittest.TestCase): self.failUnlessEqual(best, None) self.failUnlessEqual(len(sm.shares_available()), 1) self.failUnlessEqual(sm.shares_available().values()[0], (2,3) ) + return sm def test_not_quite_enough_shares(self): s = self._client._storage @@ -713,6 +737,8 @@ class Servermap(unittest.TestCase): d.addCallback(lambda res: ms(mode=MODE_CHECK)) d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm)) + d.addCallback(lambda sm: + self.failUnlessEqual(len(sm.make_sharemap()), 2)) d.addCallback(lambda res: ms(mode=MODE_ANYTHING)) d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm)) d.addCallback(lambda res: ms(mode=MODE_WRITE)) @@ -817,24 +843,32 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin): if substring: allproblems = [str(f) for f in servermap.problems] self.failUnless(substring in "".join(allproblems)) - return + return servermap if should_succeed: d1 = self._fn.download_best_version() d1.addCallback(lambda new_contents: self.failUnlessEqual(new_contents, self.CONTENTS)) - return d1 else: - return self.shouldFail(NotEnoughSharesError, - "_corrupt_all(offset=%s)" % (offset,), - substring, - self._fn.download_best_version) + d1 = self.shouldFail(NotEnoughSharesError, + "_corrupt_all(offset=%s)" % (offset,), + substring, + self._fn.download_best_version) + d1.addCallback(lambda res: servermap) + return d1 d.addCallback(_do_retrieve) return d def test_corrupt_all_verbyte(self): # when the version byte is not 0, we hit an assertion error in # unpack_share(). - return self._test_corrupt_all(0, "AssertionError") + d = self._test_corrupt_all(0, "AssertionError") + def _check_servermap(servermap): + # and the dump should mention the problems + s = StringIO() + dump = servermap.dump(s).getvalue() + self.failUnless("10 PROBLEMS" in dump, dump) + d.addCallback(_check_servermap) + return d def test_corrupt_all_seqnum(self): # a corrupt sequence number will trigger a bad signature @@ -976,8 +1010,6 @@ class MultipleEncodings(unittest.TestCase): fn2._pubkey = fn._pubkey fn2._privkey = fn._privkey fn2._encprivkey = fn._encprivkey - fn2._current_seqnum = 0 - fn2._current_roothash = "\x00" * 32 # and set the encoding parameters to something completely different fn2._required_shares = k fn2._total_shares = n @@ -1095,6 +1127,108 @@ class MultipleEncodings(unittest.TestCase): d.addCallback(_retrieved) return d +class MultipleVersions(unittest.TestCase): + def setUp(self): + self.CONTENTS = ["Contents 0", + "Contents 1", + "Contents 2", + "Contents 3a", + "Contents 3b"] + self._copied_shares = {} + num_peers = 20 + self._client = FakeClient(num_peers) + self._storage = self._client._storage + d = self._client.create_mutable_file(self.CONTENTS[0]) # seqnum=1 + def _created(node): + self._fn = node + # now create multiple versions of the same file, and accumulate + # their shares, so we can mix and match them later. + d = defer.succeed(None) + d.addCallback(self._copy_shares, 0) + d.addCallback(lambda res: node.overwrite(self.CONTENTS[1])) #s2 + d.addCallback(self._copy_shares, 1) + d.addCallback(lambda res: node.overwrite(self.CONTENTS[2])) #s3 + d.addCallback(self._copy_shares, 2) + d.addCallback(lambda res: node.overwrite(self.CONTENTS[3])) #s4a + d.addCallback(self._copy_shares, 3) + # now we replace all the shares with version s3, and upload a new + # version to get s4b. + rollback = dict([(i,2) for i in range(10)]) + d.addCallback(lambda res: self._set_versions(rollback)) + d.addCallback(lambda res: node.overwrite(self.CONTENTS[4])) #s4b + d.addCallback(self._copy_shares, 4) + # we leave the storage in state 4 + return d + d.addCallback(_created) + return d + + def _copy_shares(self, ignored, index): + shares = self._client._storage._peers + # we need a deep copy + new_shares = {} + for peerid in shares: + new_shares[peerid] = {} + for shnum in shares[peerid]: + new_shares[peerid][shnum] = shares[peerid][shnum] + self._copied_shares[index] = new_shares + + def _set_versions(self, versionmap): + # versionmap maps shnums to which version (0,1,2,3,4) we want the + # share to be at. Any shnum which is left out of the map will stay at + # its current version. + shares = self._client._storage._peers + oldshares = self._copied_shares + for peerid in shares: + for shnum in shares[peerid]: + if shnum in versionmap: + index = versionmap[shnum] + shares[peerid][shnum] = oldshares[index][peerid][shnum] + + def test_multiple_versions(self): + # if we see a mix of versions in the grid, download_best_version + # should get the latest one + self._set_versions(dict([(i,2) for i in (0,2,4,6,8)])) + d = self._fn.download_best_version() + d.addCallback(lambda res: self.failUnlessEqual(res, self.CONTENTS[4])) + # but if everything is at version 2, that's what we should download + d.addCallback(lambda res: + self._set_versions(dict([(i,2) for i in range(10)]))) + d.addCallback(lambda res: self._fn.download_best_version()) + d.addCallback(lambda res: self.failUnlessEqual(res, self.CONTENTS[2])) + # if exactly one share is at version 3, we should still get v2 + d.addCallback(lambda res: + self._set_versions({0:3})) + d.addCallback(lambda res: self._fn.download_best_version()) + d.addCallback(lambda res: self.failUnlessEqual(res, self.CONTENTS[2])) + # but the servermap should see the unrecoverable version. This + # depends upon the single newer share being queried early. + d.addCallback(lambda res: self._fn.get_servermap(MODE_READ)) + def _check_smap(smap): + self.failUnlessEqual(len(smap.unrecoverable_versions()), 1) + newer = smap.unrecoverable_newer_versions() + self.failUnlessEqual(len(newer), 1) + verinfo, health = newer.items()[0] + self.failUnlessEqual(verinfo[0], 4) + self.failUnlessEqual(health, (1,3)) + self.failIf(smap.needs_merge()) + d.addCallback(_check_smap) + # if we have a mix of two parallel versions (s4a and s4b), we could + # recover either + d.addCallback(lambda res: + self._set_versions({0:3,2:3,4:3,6:3,8:3, + 1:4,3:4,5:4,7:4,9:4})) + d.addCallback(lambda res: self._fn.get_servermap(MODE_READ)) + def _check_smap_mixed(smap): + self.failUnlessEqual(len(smap.unrecoverable_versions()), 0) + newer = smap.unrecoverable_newer_versions() + self.failUnlessEqual(len(newer), 0) + self.failUnless(smap.needs_merge()) + d.addCallback(_check_smap_mixed) + d.addCallback(lambda res: self._fn.download_best_version()) + d.addCallback(lambda res: self.failUnless(res == self.CONTENTS[3] or + res == self.CONTENTS[4])) + return d + class Utils(unittest.TestCase): def test_dict_of_sets(self): diff --git a/src/allmydata/web/map-update-status.xhtml b/src/allmydata/web/map-update-status.xhtml index b51896b1..69e39117 100644 --- a/src/allmydata/web/map-update-status.xhtml +++ b/src/allmydata/web/map-update-status.xhtml @@ -12,6 +12,7 @@