mutable: improve test coverage, fix bug in privkey fetching, add .finished to stats...
authorBrian Warner <warner@allmydata.com>
Sat, 19 Apr 2008 02:55:12 +0000 (19:55 -0700)
committerBrian Warner <warner@allmydata.com>
Sat, 19 Apr 2008 02:55:12 +0000 (19:55 -0700)
src/allmydata/mutable/node.py
src/allmydata/mutable/servermap.py
src/allmydata/test/test_mutable.py
src/allmydata/web/map-update-status.xhtml
src/allmydata/web/status.py

index 8ca68eb29a962c6baf997ddbfc9f46c75f84f719..63f83e4f2af319e11e78e836f7f72317c2c5b26d 100644 (file)
@@ -61,10 +61,6 @@ class MutableFileNode:
         self._sharemap = {} # known shares, shnum-to-[nodeids]
         self._cache = ResponseCache()
 
-        self._current_data = None # SDMF: we're allowed to cache the contents
-        self._current_roothash = None # ditto
-        self._current_seqnum = None # ditto
-
         # all users of this MutableFileNode go through the serializer. This
         # takes advantage of the fact that Deferreds discard the callbacks
         # that they're done with, so we can keep using the same Deferred
@@ -118,10 +114,6 @@ class MutableFileNode:
             self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
             self._readkey = self._uri.readkey
             self._storage_index = self._uri.storage_index
-            # TODO: seqnum/roothash: really we mean "doesn't matter since
-            # nobody knows about us yet"
-            self._current_seqnum = 0
-            self._current_roothash = "\x00"*32
             return self._upload(initial_contents, None)
         d.addCallback(_generated)
         return d
@@ -157,10 +149,6 @@ class MutableFileNode:
         self._required_shares = required_shares
     def _populate_total_shares(self, total_shares):
         self._total_shares = total_shares
-    def _populate_seqnum(self, seqnum):
-        self._current_seqnum = seqnum
-    def _populate_root_hash(self, root_hash):
-        self._current_roothash = root_hash
 
     def _populate_privkey(self, privkey):
         self._privkey = privkey
index 8d469708e3b0de61424a25f9c90c4432cf2dbfc9..25aa29f4400f9b78b8b028cbc9614cf75e2f2ea0 100644 (file)
@@ -30,15 +30,18 @@ class UpdateStatus:
         self.progress = 0.0
         self.counter = self.statusid_counter.next()
         self.started = time.time()
+        self.finished = None
 
-    def add_per_server_time(self, peerid, op, elapsed):
-        assert op in ("query", "privkey")
+    def add_per_server_time(self, peerid, op, sent, elapsed):
+        assert op in ("query", "late", "privkey")
         if peerid not in self.timings["per_server"]:
             self.timings["per_server"][peerid] = []
-        self.timings["per_server"][peerid].append((op,elapsed))
+        self.timings["per_server"][peerid].append((op,sent,elapsed))
 
     def get_started(self):
         return self.started
+    def get_finished(self):
+        return self.finished
     def get_storage_index(self):
         return self.storage_index
     def get_mode(self):
@@ -72,6 +75,8 @@ class UpdateStatus:
         self.progress = value
     def set_active(self, value):
         self.active = value
+    def set_finished(self, when):
+        self.finished = when
 
 class ServerMap:
     """I record the placement of mutable shares.
@@ -140,6 +145,10 @@ class ServerMap:
                           (idlib.shortnodeid_b2a(peerid), shnum,
                            seqnum, base32.b2a(root_hash)[:4], k, N,
                            datalength))
+        if self.problems:
+            print >>out, "%d PROBLEMS" % len(self.problems)
+            for f in self.problems:
+                print >>out, str(f)
         return out
 
     def all_peers(self):
@@ -174,7 +183,6 @@ class ServerMap:
             (verinfo, timestamp) = self.servermap[key]
             return verinfo
         return None
-        return None
 
     def shares_available(self):
         """Return a dict that maps verinfo to tuples of
@@ -254,14 +262,38 @@ class ServerMap:
         # Return a dict of versionid -> health, for versions that are
         # unrecoverable and have later seqnums than any recoverable versions.
         # These indicate that a write will lose data.
-        pass
+        versionmap = self.make_versionmap()
+        healths = {} # maps verinfo to (found,k)
+        unrecoverable = set()
+        highest_recoverable_seqnum = -1
+        for (verinfo, shares) in versionmap.items():
+            (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+             offsets_tuple) = verinfo
+            shnums = set([shnum for (shnum, peerid, timestamp) in shares])
+            healths[verinfo] = (len(shnums),k)
+            if len(shnums) < k:
+                unrecoverable.add(verinfo)
+            else:
+                highest_recoverable_seqnum = max(seqnum,
+                                                 highest_recoverable_seqnum)
+
+        newversions = {}
+        for verinfo in unrecoverable:
+            (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+             offsets_tuple) = verinfo
+            if seqnum > highest_recoverable_seqnum:
+                newversions[verinfo] = healths[verinfo]
+
+        return newversions
+
 
     def needs_merge(self):
         # return True if there are multiple recoverable versions with the
         # same seqnum, meaning that MutableFileNode.read_best_version is not
         # giving you the whole story, and that using its data to do a
         # subsequent publish will lose information.
-        pass
+        return bool(len(self.recoverable_versions()) > 1)
+
 
 class ServermapUpdater:
     def __init__(self, filenode, servermap, mode=MODE_READ):
@@ -457,13 +489,14 @@ class ServermapUpdater:
                      level=log.NOISY)
         now = time.time()
         elapsed = now - started
-        self._status.add_per_server_time(peerid, "query", elapsed)
         self._queries_outstanding.discard(peerid)
         self._must_query.discard(peerid)
         self._queries_completed += 1
         if not self._running:
             self.log("but we're not running, so we'll ignore it", parent=lp)
+            self._status.add_per_server_time(peerid, "late", started, elapsed)
             return
+        self._status.add_per_server_time(peerid, "query", started, elapsed)
 
         if datavs:
             self._good_peers.add(peerid)
@@ -602,7 +635,7 @@ class ServermapUpdater:
          pubkey, signature, share_hash_chain, block_hash_tree,
          share_data, enc_privkey) = r
 
-        return self._try_to_validate_privkey(self, enc_privkey, peerid, shnum)
+        return self._try_to_validate_privkey(enc_privkey, peerid, shnum)
 
     def _try_to_validate_privkey(self, enc_privkey, peerid, shnum):
 
@@ -638,7 +671,7 @@ class ServermapUpdater:
     def _got_privkey_results(self, datavs, peerid, shnum, started):
         now = time.time()
         elapsed = now - started
-        self._status.add_per_server_time(peerid, "privkey", elapsed)
+        self._status.add_per_server_time(peerid, "privkey", started, elapsed)
         self._queries_outstanding.discard(peerid)
         if not self._need_privkey:
             return
@@ -853,7 +886,9 @@ class ServermapUpdater:
         if not self._running:
             return
         self._running = False
-        elapsed = time.time() - self._started
+        now = time.time()
+        elapsed = now - self._started
+        self._status.set_finished(now)
         self._status.timings["total"] = elapsed
         self._status.set_progress(1.0)
         self._status.set_status("Done")
index dd41997a1410dd2275982a38ae9d5d27f829a0d9..b3d78815dd4b07dafddef554b21c3870ed6a9c84 100644 (file)
@@ -575,11 +575,14 @@ class Servermap(unittest.TestCase):
         d = self._client.create_mutable_file("New contents go here")
         def _created(node):
             self._fn = node
+            self._fn2 = self._client.create_node_from_uri(node.get_uri())
         d.addCallback(_created)
         return d
 
-    def make_servermap(self, mode=MODE_CHECK):
-        smu = ServermapUpdater(self._fn, ServerMap(), mode)
+    def make_servermap(self, mode=MODE_CHECK, fn=None):
+        if fn is None:
+            fn = self._fn
+        smu = ServermapUpdater(fn, ServerMap(), mode)
         d = smu.update()
         return d
 
@@ -621,6 +624,7 @@ class Servermap(unittest.TestCase):
         d.addCallback(lambda sm: us(sm, mode=MODE_READ))
         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
         d.addCallback(lambda sm: us(sm, mode=MODE_WRITE))
+        d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
         d.addCallback(lambda sm: us(sm, mode=MODE_CHECK))
         d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
         d.addCallback(lambda sm: us(sm, mode=MODE_ANYTHING))
@@ -628,6 +632,25 @@ class Servermap(unittest.TestCase):
 
         return d
 
+    def test_fetch_privkey(self):
+        d = defer.succeed(None)
+        # use the sibling filenode (which hasn't been used yet), and make
+        # sure it can fetch the privkey. The file is small, so the privkey
+        # will be fetched on the first (query) pass.
+        d.addCallback(lambda res: self.make_servermap(MODE_WRITE, self._fn2))
+        d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
+
+        # create a new file, which is large enough to knock the privkey out
+        # of the early part of the fil
+        LARGE = "These are Larger contents" * 200 # about 5KB
+        d.addCallback(lambda res: self._client.create_mutable_file(LARGE))
+        def _created(large_fn):
+            large_fn2 = self._client.create_node_from_uri(large_fn.get_uri())
+            return self.make_servermap(MODE_WRITE, large_fn2)
+        d.addCallback(_created)
+        d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
+        return d
+
     def test_mark_bad(self):
         d = defer.succeed(None)
         ms = self.make_servermap
@@ -696,6 +719,7 @@ class Servermap(unittest.TestCase):
         self.failUnlessEqual(best, None)
         self.failUnlessEqual(len(sm.shares_available()), 1)
         self.failUnlessEqual(sm.shares_available().values()[0], (2,3) )
+        return sm
 
     def test_not_quite_enough_shares(self):
         s = self._client._storage
@@ -713,6 +737,8 @@ class Servermap(unittest.TestCase):
 
         d.addCallback(lambda res: ms(mode=MODE_CHECK))
         d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
+        d.addCallback(lambda sm:
+                      self.failUnlessEqual(len(sm.make_sharemap()), 2))
         d.addCallback(lambda res: ms(mode=MODE_ANYTHING))
         d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
         d.addCallback(lambda res: ms(mode=MODE_WRITE))
@@ -817,24 +843,32 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
                 if substring:
                     allproblems = [str(f) for f in servermap.problems]
                     self.failUnless(substring in "".join(allproblems))
-                return
+                return servermap
             if should_succeed:
                 d1 = self._fn.download_best_version()
                 d1.addCallback(lambda new_contents:
                                self.failUnlessEqual(new_contents, self.CONTENTS))
-                return d1
             else:
-                return self.shouldFail(NotEnoughSharesError,
-                                       "_corrupt_all(offset=%s)" % (offset,),
-                                       substring,
-                                       self._fn.download_best_version)
+                d1 = self.shouldFail(NotEnoughSharesError,
+                                     "_corrupt_all(offset=%s)" % (offset,),
+                                     substring,
+                                     self._fn.download_best_version)
+            d1.addCallback(lambda res: servermap)
+            return d1
         d.addCallback(_do_retrieve)
         return d
 
     def test_corrupt_all_verbyte(self):
         # when the version byte is not 0, we hit an assertion error in
         # unpack_share().
-        return self._test_corrupt_all(0, "AssertionError")
+        d = self._test_corrupt_all(0, "AssertionError")
+        def _check_servermap(servermap):
+            # and the dump should mention the problems
+            s = StringIO()
+            dump = servermap.dump(s).getvalue()
+            self.failUnless("10 PROBLEMS" in dump, dump)
+        d.addCallback(_check_servermap)
+        return d
 
     def test_corrupt_all_seqnum(self):
         # a corrupt sequence number will trigger a bad signature
@@ -976,8 +1010,6 @@ class MultipleEncodings(unittest.TestCase):
         fn2._pubkey = fn._pubkey
         fn2._privkey = fn._privkey
         fn2._encprivkey = fn._encprivkey
-        fn2._current_seqnum = 0
-        fn2._current_roothash = "\x00" * 32
         # and set the encoding parameters to something completely different
         fn2._required_shares = k
         fn2._total_shares = n
@@ -1095,6 +1127,108 @@ class MultipleEncodings(unittest.TestCase):
         d.addCallback(_retrieved)
         return d
 
+class MultipleVersions(unittest.TestCase):
+    def setUp(self):
+        self.CONTENTS = ["Contents 0",
+                         "Contents 1",
+                         "Contents 2",
+                         "Contents 3a",
+                         "Contents 3b"]
+        self._copied_shares = {}
+        num_peers = 20
+        self._client = FakeClient(num_peers)
+        self._storage = self._client._storage
+        d = self._client.create_mutable_file(self.CONTENTS[0]) # seqnum=1
+        def _created(node):
+            self._fn = node
+            # now create multiple versions of the same file, and accumulate
+            # their shares, so we can mix and match them later.
+            d = defer.succeed(None)
+            d.addCallback(self._copy_shares, 0)
+            d.addCallback(lambda res: node.overwrite(self.CONTENTS[1])) #s2
+            d.addCallback(self._copy_shares, 1)
+            d.addCallback(lambda res: node.overwrite(self.CONTENTS[2])) #s3
+            d.addCallback(self._copy_shares, 2)
+            d.addCallback(lambda res: node.overwrite(self.CONTENTS[3])) #s4a
+            d.addCallback(self._copy_shares, 3)
+            # now we replace all the shares with version s3, and upload a new
+            # version to get s4b.
+            rollback = dict([(i,2) for i in range(10)])
+            d.addCallback(lambda res: self._set_versions(rollback))
+            d.addCallback(lambda res: node.overwrite(self.CONTENTS[4])) #s4b
+            d.addCallback(self._copy_shares, 4)
+            # we leave the storage in state 4
+            return d
+        d.addCallback(_created)
+        return d
+
+    def _copy_shares(self, ignored, index):
+        shares = self._client._storage._peers
+        # we need a deep copy
+        new_shares = {}
+        for peerid in shares:
+            new_shares[peerid] = {}
+            for shnum in shares[peerid]:
+                new_shares[peerid][shnum] = shares[peerid][shnum]
+        self._copied_shares[index] = new_shares
+
+    def _set_versions(self, versionmap):
+        # versionmap maps shnums to which version (0,1,2,3,4) we want the
+        # share to be at. Any shnum which is left out of the map will stay at
+        # its current version.
+        shares = self._client._storage._peers
+        oldshares = self._copied_shares
+        for peerid in shares:
+            for shnum in shares[peerid]:
+                if shnum in versionmap:
+                    index = versionmap[shnum]
+                    shares[peerid][shnum] = oldshares[index][peerid][shnum]
+
+    def test_multiple_versions(self):
+        # if we see a mix of versions in the grid, download_best_version
+        # should get the latest one
+        self._set_versions(dict([(i,2) for i in (0,2,4,6,8)]))
+        d = self._fn.download_best_version()
+        d.addCallback(lambda res: self.failUnlessEqual(res, self.CONTENTS[4]))
+        # but if everything is at version 2, that's what we should download
+        d.addCallback(lambda res:
+                      self._set_versions(dict([(i,2) for i in range(10)])))
+        d.addCallback(lambda res: self._fn.download_best_version())
+        d.addCallback(lambda res: self.failUnlessEqual(res, self.CONTENTS[2]))
+        # if exactly one share is at version 3, we should still get v2
+        d.addCallback(lambda res:
+                      self._set_versions({0:3}))
+        d.addCallback(lambda res: self._fn.download_best_version())
+        d.addCallback(lambda res: self.failUnlessEqual(res, self.CONTENTS[2]))
+        # but the servermap should see the unrecoverable version. This
+        # depends upon the single newer share being queried early.
+        d.addCallback(lambda res: self._fn.get_servermap(MODE_READ))
+        def _check_smap(smap):
+            self.failUnlessEqual(len(smap.unrecoverable_versions()), 1)
+            newer = smap.unrecoverable_newer_versions()
+            self.failUnlessEqual(len(newer), 1)
+            verinfo, health = newer.items()[0]
+            self.failUnlessEqual(verinfo[0], 4)
+            self.failUnlessEqual(health, (1,3))
+            self.failIf(smap.needs_merge())
+        d.addCallback(_check_smap)
+        # if we have a mix of two parallel versions (s4a and s4b), we could
+        # recover either
+        d.addCallback(lambda res:
+                      self._set_versions({0:3,2:3,4:3,6:3,8:3,
+                                          1:4,3:4,5:4,7:4,9:4}))
+        d.addCallback(lambda res: self._fn.get_servermap(MODE_READ))
+        def _check_smap_mixed(smap):
+            self.failUnlessEqual(len(smap.unrecoverable_versions()), 0)
+            newer = smap.unrecoverable_newer_versions()
+            self.failUnlessEqual(len(newer), 0)
+            self.failUnless(smap.needs_merge())
+        d.addCallback(_check_smap_mixed)
+        d.addCallback(lambda res: self._fn.download_best_version())
+        d.addCallback(lambda res: self.failUnless(res == self.CONTENTS[3] or
+                                                  res == self.CONTENTS[4]))
+        return d
+
 
 class Utils(unittest.TestCase):
     def test_dict_of_sets(self):
index b51896b1fee224f52b4a6d680e692e2e1bb9006c..69e39117c0d51f412d082a7290d68c09398e701e 100644 (file)
@@ -12,6 +12,7 @@
 
 <ul>
   <li>Started: <span n:render="started"/></li>
+  <li>Finished: <span n:render="finished"/></li>
   <li>Storage Index: <span n:render="si"/></li>
   <li>Helper?: <span n:render="helper"/></li>
   <li>Progress: <span n:render="progress"/></li>
index 0cc7304a63d3864159621febbd490b15f3f1098f..fc91010f66a44ffc22f5282fbc1768714080f77f 100644 (file)
@@ -624,6 +624,15 @@ class MapupdateStatusPage(rend.Page, RateAndTimeMixin):
                                   time.localtime(data.get_started()))
         return started_s
 
+    def render_finished(self, ctx, data):
+        when = data.get_finished()
+        if not when:
+            return "not yet"
+        TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
+        started_s = time.strftime(TIME_FORMAT,
+                                  time.localtime(data.get_finished()))
+        return started_s
+
     def render_si(self, ctx, data):
         si_s = base32.b2a_or_none(data.get_storage_index())
         if si_s is None:
@@ -677,11 +686,13 @@ class MapupdateStatusPage(rend.Page, RateAndTimeMixin):
         for peerid in sorted(per_server.keys()):
             peerid_s = idlib.shortnodeid_b2a(peerid)
             times = []
-            for op,t in per_server[peerid]:
+            for op,started,t in per_server[peerid]:
                 if op == "query":
                     times.append( self.render_time(None, t) )
+                elif op == "late":
+                    times.append( "late(" + self.render_time(None, t) + ")" )
                 else:
-                    times.append( "(" + self.render_time(None, t) + ")" )
+                    times.append( "privkey(" + self.render_time(None, t) + ")" )
             times_s = ", ".join(times)
             l[T.li["[%s]: %s" % (peerid_s, times_s)]]
         return T.li["Per-Server Response Times: ", l]