From 3e26c78ee3713e9ffbb3d7d2e725f3d3f9c332ca Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@lothar.com>
Date: Mon, 18 Mar 2013 17:40:56 -0700
Subject: [PATCH] introducer.client: use integer seqnums, not time-based.
 Closes #1767.

This stores the sequence number in BASEDIR/announcement-seqnum, and
increments it each time any service is published (every service
announcement is regenerated with the new sequence number). As everyone
knows, time is an illusion, and occasionally goes backwards, so a
counter is generally safer (and reveals less information about the
node).

Later, we'll improve the introducer client to tolerate rollbacks (where,
perhaps due to a VM being restarted from an earlier checkpoint, the
stored sequence number reverts to an earlier version).
---
 src/allmydata/client.py               |  13 ++-
 src/allmydata/introducer/client.py    |  68 +++++++++------
 src/allmydata/introducer/server.py    |   5 +-
 src/allmydata/test/test_introducer.py | 117 ++++++++++++++++++++------
 4 files changed, 147 insertions(+), 56 deletions(-)

diff --git a/src/allmydata/client.py b/src/allmydata/client.py
index e1899971..ad804248 100644
--- a/src/allmydata/client.py
+++ b/src/allmydata/client.py
@@ -163,13 +163,24 @@ class Client(node.Node, pollmixin.PollMixin):
         if webport:
             self.init_web(webport) # strports string
 
+    def _sequencer(self):
+        seqnum_s = self.get_config_from_file("announcement-seqnum")
+        if not seqnum_s:
+            seqnum_s = "0"
+        seqnum = int(seqnum_s.strip())
+        seqnum += 1 # increment
+        self.write_config("announcement-seqnum", "%d\n" % seqnum)
+        nonce = _make_secret().strip()
+        return seqnum, nonce
+
     def init_introducer_client(self):
         self.introducer_furl = self.get_config("client", "introducer.furl")
         ic = IntroducerClient(self.tub, self.introducer_furl,
                               self.nickname,
                               str(allmydata.__full_version__),
                               str(self.OLDEST_SUPPORTED_VERSION),
-                              self.get_app_versions())
+                              self.get_app_versions(),
+                              self._sequencer)
         self.introducer_client = ic
         # hold off on starting the IntroducerClient until our tub has been
         # started, so we'll have a useful address on our RemoteReference, so
diff --git a/src/allmydata/introducer/client.py b/src/allmydata/introducer/client.py
index 33ce5ad8..f463c120 100644
--- a/src/allmydata/introducer/client.py
+++ b/src/allmydata/introducer/client.py
@@ -50,7 +50,7 @@ class IntroducerClient(service.Service, Referenceable):
 
     def __init__(self, tub, introducer_furl,
                  nickname, my_version, oldest_supported,
-                 app_versions):
+                 app_versions, sequencer):
         self._tub = tub
         self.introducer_furl = introducer_furl
 
@@ -59,6 +59,7 @@ class IntroducerClient(service.Service, Referenceable):
         self._my_version = my_version
         self._oldest_supported = oldest_supported
         self._app_versions = app_versions
+        self._sequencer = sequencer
 
         self._my_subscriber_info = { "version": 0,
                                      "nickname": self._nickname,
@@ -69,7 +70,8 @@ class IntroducerClient(service.Service, Referenceable):
         self._stub_client = None # for_v1
         self._stub_client_furl = None
 
-        self._published_announcements = {}
+        self._outbound_announcements = {} # not signed
+        self._published_announcements = {} # signed
         self._canary = Referenceable()
 
         self._publisher = None
@@ -78,14 +80,14 @@ class IntroducerClient(service.Service, Referenceable):
         self._subscribed_service_names = set()
         self._subscriptions = set() # requests we've actually sent
 
-        # _current_announcements remembers one announcement per
+        # _inbound_announcements remembers one announcement per
         # (servicename,serverid) pair. Anything that arrives with the same
         # pair will displace the previous one. This stores tuples of
         # (unpacked announcement dictionary, verifyingkey, rxtime). The ann
         # dicts can be compared for equality to distinguish re-announcement
         # from updates. It also provides memory for clients who subscribe
         # after startup.
-        self._current_announcements = {}
+        self._inbound_announcements = {}
 
         self.encoding_parameters = None
 
@@ -154,7 +156,7 @@ class IntroducerClient(service.Service, Referenceable):
         self._local_subscribers.append( (service_name,cb,args,kwargs) )
         self._subscribed_service_names.add(service_name)
         self._maybe_subscribe()
-        for index,(ann,key_s,when) in self._current_announcements.items():
+        for index,(ann,key_s,when) in self._inbound_announcements.items():
             servicename = index[0]
             if servicename == service_name:
                 eventually(cb, key_s, ann, *args, **kwargs)
@@ -201,24 +203,33 @@ class IntroducerClient(service.Service, Referenceable):
         d.addCallback(_publish_stub_client)
         return d
 
-    def create_announcement(self, service_name, ann, signing_key, _mod=None):
-        full_ann = { "version": 0,
-                     "seqnum": time.time(),
-                     "nickname": self._nickname,
-                     "app-versions": self._app_versions,
-                     "my-version": self._my_version,
-                     "oldest-supported": self._oldest_supported,
-
-                     "service-name": service_name,
-                     }
-        full_ann.update(ann)
-        if _mod:
-            full_ann = _mod(full_ann) # for unit tests
-        return sign_to_foolscap(full_ann, signing_key)
+    def create_announcement_dict(self, service_name, ann):
+        ann_d = { "version": 0,
+                  # "seqnum" and "nonce" will be populated with new values in
+                  # publish(), each time we make a change
+                  "nickname": self._nickname,
+                  "app-versions": self._app_versions,
+                  "my-version": self._my_version,
+                  "oldest-supported": self._oldest_supported,
+
+                  "service-name": service_name,
+                  }
+        ann_d.update(ann)
+        return ann_d
 
     def publish(self, service_name, ann, signing_key=None):
-        ann_t = self.create_announcement(service_name, ann, signing_key)
-        self._published_announcements[service_name] = ann_t
+        # we increment the seqnum every time we publish something new
+        current_seqnum, current_nonce = self._sequencer()
+
+        ann_d = self.create_announcement_dict(service_name, ann)
+        self._outbound_announcements[service_name] = ann_d
+
+        # publish all announcements with the new seqnum and nonce
+        for service_name,ann_d in self._outbound_announcements.items():
+            ann_d["seqnum"] = current_seqnum
+            ann_d["nonce"] = current_nonce
+            ann_t = sign_to_foolscap(ann_d, signing_key)
+            self._published_announcements[service_name] = ann_t
         self._maybe_publish()
 
     def _maybe_publish(self):
@@ -299,8 +310,8 @@ class IntroducerClient(service.Service, Referenceable):
         index = make_index(ann, key_s)
 
         # is this announcement a duplicate?
-        if (index in self._current_announcements
-            and self._current_announcements[index][0] == ann):
+        if (index in self._inbound_announcements
+            and self._inbound_announcements[index][0] == ann):
             self.log(format="reannouncement for [%(service)s]:%(description)s, ignoring",
                      service=service_name, description=description,
                      parent=lp2, level=log.UNUSUAL, umid="B1MIdA")
@@ -308,12 +319,13 @@ class IntroducerClient(service.Service, Referenceable):
             return
 
         # does it update an existing one?
-        if index in self._current_announcements:
-            old,_,_ = self._current_announcements[index]
+        if index in self._inbound_announcements:
+            old,_,_ = self._inbound_announcements[index]
             if "seqnum" in old:
                 # must beat previous sequence number to replace
-                if "seqnum" not in ann:
-                    self.log("not replacing old announcement, no seqnum: %s"
+                if ("seqnum" not in ann
+                    or not isinstance(ann["seqnum"], (int,long))):
+                    self.log("not replacing old announcement, no valid seqnum: %s"
                              % (ann,),
                              parent=lp2, level=log.NOISY, umid="zFGH3Q")
                     return
@@ -335,7 +347,7 @@ class IntroducerClient(service.Service, Referenceable):
             self.log("new announcement[%s]" % service_name,
                      parent=lp2, level=log.NOISY)
 
-        self._current_announcements[index] = (ann, key_s, time.time())
+        self._inbound_announcements[index] = (ann, key_s, time.time())
         # note: we never forget an index, but we might update its value
 
         for (service_name2,cb,args,kwargs) in self._local_subscribers:
diff --git a/src/allmydata/introducer/server.py b/src/allmydata/introducer/server.py
index 1a4b7680..adc68af3 100644
--- a/src/allmydata/introducer/server.py
+++ b/src/allmydata/introducer/server.py
@@ -220,8 +220,9 @@ class IntroducerService(service.MultiService, Referenceable):
             else:
                 if "seqnum" in old_ann:
                     # must beat previous sequence number to replace
-                    if "seqnum" not in ann:
-                        self.log("not replacing old ann, no seqnum",
+                    if ("seqnum" not in ann
+                        or not isinstance(ann["seqnum"], (int,long))):
+                        self.log("not replacing old ann, no valid seqnum",
                                  level=log.NOISY, umid="ySbaVw")
                         self._debug_counts["inbound_no_seqnum"] += 1
                         return
diff --git a/src/allmydata/test/test_introducer.py b/src/allmydata/test/test_introducer.py
index e8eca502..0127d282 100644
--- a/src/allmydata/test/test_introducer.py
+++ b/src/allmydata/test/test_introducer.py
@@ -1,5 +1,5 @@
 
-import os, re
+import os, re, itertools
 from base64 import b32decode
 import simplejson
 
@@ -20,7 +20,8 @@ from allmydata.introducer import old
 # test compatibility with old introducer .tac files
 from allmydata.introducer import IntroducerNode
 from allmydata.web import introweb
-from allmydata.util import pollmixin, keyutil, idlib
+from allmydata.client import Client as TahoeClient
+from allmydata.util import pollmixin, keyutil, idlib, fileutil
 import allmydata.test.common_util as testutil
 
 class LoggingMultiService(service.MultiService):
@@ -54,7 +55,7 @@ class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
 
     def test_create(self):
         ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
-                              "my_version", "oldest_version", {})
+                              "my_version", "oldest_version", {}, fakeseq)
         self.failUnless(isinstance(ic, IntroducerClient))
 
     def test_listen(self):
@@ -86,12 +87,12 @@ class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
         i = IntroducerService()
         ic = IntroducerClient(None,
                               "introducer.furl", u"my_nickname",
-                              "my_version", "oldest_version", {})
+                              "my_version", "oldest_version", {}, fakeseq)
         sk_s, vk_s = keyutil.make_keypair()
         sk, _ignored = keyutil.parse_privkey(sk_s)
         keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
-        ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
+        ann_t = make_ann_t(ic, furl1, sk, 1)
         i.remote_publish_v2(ann_t, Referenceable())
         announcements = i.get_announcements()
         self.failUnlessEqual(len(announcements), 1)
@@ -112,24 +113,30 @@ class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
         self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2)
 
 
+def fakeseq():
+    return 1, "nonce"
+
+seqnum_counter = itertools.count(1)
+def realseq():
+    return seqnum_counter.next(), str(os.randint(1,100000))
+
 def make_ann(furl):
     ann = { "anonymous-storage-FURL": furl,
             "permutation-seed-base32": get_tubid_string(furl) }
     return ann
 
 def make_ann_t(ic, furl, privkey, seqnum):
-    def mod(ann):
-        ann["seqnum"] = seqnum
-        if seqnum is None:
-            del ann["seqnum"]
-        return ann
-    return ic.create_announcement("storage", make_ann(furl), privkey, mod)
+    ann_d = ic.create_announcement_dict("storage", make_ann(furl))
+    ann_d["seqnum"] = seqnum
+    ann_d["nonce"] = "nonce"
+    ann_t = sign_to_foolscap(ann_d, privkey)
+    return ann_t
 
 class Client(unittest.TestCase):
     def test_duplicate_receive_v1(self):
         ic = IntroducerClient(None,
                               "introducer.furl", u"my_nickname",
-                              "my_version", "oldest_version", {})
+                              "my_version", "oldest_version", {}, fakeseq)
         announcements = []
         ic.subscribe_to("storage",
                         lambda key_s,ann: announcements.append(ann))
@@ -178,12 +185,12 @@ class Client(unittest.TestCase):
     def test_duplicate_receive_v2(self):
         ic1 = IntroducerClient(None,
                                "introducer.furl", u"my_nickname",
-                               "ver23", "oldest_version", {})
+                               "ver23", "oldest_version", {}, fakeseq)
         # we use a second client just to create a different-looking
         # announcement
         ic2 = IntroducerClient(None,
                                "introducer.furl", u"my_nickname",
-                               "ver24","oldest_version",{})
+                               "ver24","oldest_version",{}, fakeseq)
         announcements = []
         def _received(key_s, ann):
             announcements.append( (key_s, ann) )
@@ -286,7 +293,7 @@ class Client(unittest.TestCase):
         # not replace the other)
         ic = IntroducerClient(None,
                               "introducer.furl", u"my_nickname",
-                              "my_version", "oldest_version", {})
+                              "my_version", "oldest_version", {}, fakeseq)
         announcements = []
         ic.subscribe_to("storage",
                         lambda key_s,ann: announcements.append(ann))
@@ -295,7 +302,7 @@ class Client(unittest.TestCase):
         keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
         furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
-        ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
+        ann_t = make_ann_t(ic, furl1, sk, 1)
         ic.remote_announce_v2([ann_t])
         d = fireEventually()
         def _then(ign):
@@ -324,7 +331,7 @@ class Server(unittest.TestCase):
         i = IntroducerService()
         ic1 = IntroducerClient(None,
                                "introducer.furl", u"my_nickname",
-                               "ver23", "oldest_version", {})
+                               "ver23", "oldest_version", {}, realseq)
         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
 
         privkey_s, _ = keyutil.make_keypair()
@@ -334,6 +341,7 @@ class Server(unittest.TestCase):
         ann1_old = make_ann_t(ic1, furl1, privkey, seqnum=9)
         ann1_new = make_ann_t(ic1, furl1, privkey, seqnum=11)
         ann1_noseqnum = make_ann_t(ic1, furl1, privkey, seqnum=None)
+        ann1_badseqnum = make_ann_t(ic1, furl1, privkey, seqnum="not an int")
 
         i.remote_publish_v2(ann1, None)
         all = i.get_announcements()
@@ -385,6 +393,16 @@ class Server(unittest.TestCase):
         self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
         self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
 
+        i.remote_publish_v2(ann1_badseqnum, None)
+        all = i.get_announcements()
+        self.failUnlessEqual(len(all), 1)
+        self.failUnlessEqual(all[0].announcement["seqnum"], 11)
+        self.failUnlessEqual(i._debug_counts["inbound_message"], 6)
+        self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1)
+        self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 2)
+        self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1)
+        self.failUnlessEqual(i._debug_counts["inbound_update"], 1)
+
 
 NICKNAME = u"n\u00EDickname-%s" # LATIN SMALL LETTER I WITH ACUTE
 
@@ -415,7 +433,7 @@ class Queue(SystemTestMixin, unittest.TestCase):
         tub2 = Tub()
         tub2.setServiceParent(self.parent)
         c = IntroducerClient(tub2, ifurl,
-                             u"nickname", "version", "oldest", {})
+                             u"nickname", "version", "oldest", {}, fakeseq)
         furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
         sk_s, vk_s = keyutil.make_keypair()
         sk, _ignored = keyutil.parse_privkey(sk_s)
@@ -503,7 +521,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
                 c = IntroducerClient(tub, self.introducer_furl,
                                      NICKNAME % str(i),
                                      "version", "oldest",
-                                     {"component": "component-v1"})
+                                     {"component": "component-v1"}, fakeseq)
             received_announcements[c] = {}
             def got(key_s_or_tubid, ann, announcements, i):
                 if i == 0:
@@ -822,7 +840,8 @@ class ClientInfo(unittest.TestCase):
         tub = introducer_furl = None
         app_versions = {"whizzy": "fizzy"}
         client_v2 = IntroducerClient(tub, introducer_furl, NICKNAME % u"v2",
-                                     "my_version", "oldest", app_versions)
+                                     "my_version", "oldest", app_versions,
+                                     fakeseq)
         #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
         #ann_s = make_ann_t(client_v2, furl1, None, 10)
         #introducer.remote_publish_v2(ann_s, Referenceable())
@@ -883,10 +902,11 @@ class Announcements(unittest.TestCase):
         tub = introducer_furl = None
         app_versions = {"whizzy": "fizzy"}
         client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
-                                     "my_version", "oldest", app_versions)
+                                     "my_version", "oldest", app_versions,
+                                     fakeseq)
         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
         tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
-        ann_s0 = make_ann_t(client_v2, furl1, None, 10.0)
+        ann_s0 = make_ann_t(client_v2, furl1, None, 10)
         canary0 = Referenceable()
         introducer.remote_publish_v2(ann_s0, canary0)
         a = introducer.get_announcements()
@@ -904,12 +924,13 @@ class Announcements(unittest.TestCase):
         tub = introducer_furl = None
         app_versions = {"whizzy": "fizzy"}
         client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
-                                     "my_version", "oldest", app_versions)
+                                     "my_version", "oldest", app_versions,
+                                     fakeseq)
         furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
         sk_s, vk_s = keyutil.make_keypair()
         sk, _ignored = keyutil.parse_privkey(sk_s)
         pks = keyutil.remove_prefix(vk_s, "pub-")
-        ann_t0 = make_ann_t(client_v2, furl1, sk, 10.0)
+        ann_t0 = make_ann_t(client_v2, furl1, sk, 10)
         canary0 = Referenceable()
         introducer.remote_publish_v2(ann_t0, canary0)
         a = introducer.get_announcements()
@@ -941,6 +962,51 @@ class Announcements(unittest.TestCase):
         self.failUnlessEqual(a[0].version, "my_version")
         self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1)
 
+class ClientSeqnums(unittest.TestCase):
+    def test_client(self):
+        basedir = "introducer/ClientSeqnums/test_client"
+        fileutil.make_dirs(basedir)
+        f = open(os.path.join(basedir, "tahoe.cfg"), "w")
+        f.write("[client]\n")
+        f.write("introducer.furl = nope\n")
+        f.close()
+        c = TahoeClient(basedir)
+        ic = c.introducer_client
+        outbound = ic._outbound_announcements
+        published = ic._published_announcements
+        def read_seqnum():
+            f = open(os.path.join(basedir, "announcement-seqnum"))
+            seqnum = f.read().strip()
+            f.close()
+            return int(seqnum)
+
+        ic.publish("sA", {"key": "value1"}, c._server_key)
+        self.failUnlessEqual(read_seqnum(), 1)
+        self.failUnless("sA" in outbound)
+        self.failUnlessEqual(outbound["sA"]["seqnum"], 1)
+        nonce1 = outbound["sA"]["nonce"]
+        self.failUnless(isinstance(nonce1, str))
+        self.failUnlessEqual(simplejson.loads(published["sA"][0]),
+                             outbound["sA"])
+        # [1] is the signature, [2] is the pubkey
+
+        # publishing a second service causes both services to be
+        # re-published, with the next higher sequence number
+        ic.publish("sB", {"key": "value2"}, c._server_key)
+        self.failUnlessEqual(read_seqnum(), 2)
+        self.failUnless("sB" in outbound)
+        self.failUnlessEqual(outbound["sB"]["seqnum"], 2)
+        self.failUnless("sA" in outbound)
+        self.failUnlessEqual(outbound["sA"]["seqnum"], 2)
+        nonce2 = outbound["sA"]["nonce"]
+        self.failUnless(isinstance(nonce2, str))
+        self.failIfEqual(nonce1, nonce2)
+        self.failUnlessEqual(simplejson.loads(published["sA"][0]),
+                             outbound["sA"])
+        self.failUnlessEqual(simplejson.loads(published["sB"][0]),
+                             outbound["sB"])
+
+
 
 class TooNewServer(IntroducerService):
     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
@@ -969,7 +1035,8 @@ class NonV1Server(SystemTestMixin, unittest.TestCase):
         tub.setLocation("localhost:%d" % portnum)
 
         c = IntroducerClient(tub, self.introducer_furl,
-                             u"nickname-client", "version", "oldest", {})
+                             u"nickname-client", "version", "oldest", {},
+                             fakeseq)
         announcements = {}
         def got(key_s, ann):
             announcements[key_s] = ann
-- 
2.45.2