From: Brian Warner Date: Tue, 19 Mar 2013 00:40:56 +0000 (-0700) Subject: introducer.client: use integer seqnums, not time-based. Closes #1767. X-Git-Tag: allmydata-tahoe-1.10a1~4 X-Git-Url: https://git.rkrishnan.org/listings/(%5B%5E?a=commitdiff_plain;h=3e26c78ee3713e9ffbb3d7d2e725f3d3f9c332ca;p=tahoe-lafs%2Ftahoe-lafs.git introducer.client: use integer seqnums, not time-based. Closes #1767. This stores the sequence number in BASEDIR/announcement-seqnum, and increments it each time any service is published (every service announcement is regenerated with the new sequence number). As everyone knows, time is an illusion, and occasionally goes backwards, so a counter is generally safer (and reveals less information about the node). Later, we'll improve the introducer client to tolerate rollbacks (where, perhaps due to a VM being restarted from an earlier checkpoint, the stored sequence number reverts to an earlier version). --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index e1899971..ad804248 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -163,13 +163,24 @@ class Client(node.Node, pollmixin.PollMixin): if webport: self.init_web(webport) # strports string + def _sequencer(self): + seqnum_s = self.get_config_from_file("announcement-seqnum") + if not seqnum_s: + seqnum_s = "0" + seqnum = int(seqnum_s.strip()) + seqnum += 1 # increment + self.write_config("announcement-seqnum", "%d\n" % seqnum) + nonce = _make_secret().strip() + return seqnum, nonce + def init_introducer_client(self): self.introducer_furl = self.get_config("client", "introducer.furl") ic = IntroducerClient(self.tub, self.introducer_furl, self.nickname, str(allmydata.__full_version__), str(self.OLDEST_SUPPORTED_VERSION), - self.get_app_versions()) + self.get_app_versions(), + self._sequencer) self.introducer_client = ic # hold off on starting the IntroducerClient until our tub has been # started, so we'll have a useful address on our RemoteReference, so diff --git a/src/allmydata/introducer/client.py b/src/allmydata/introducer/client.py index 33ce5ad8..f463c120 100644 --- a/src/allmydata/introducer/client.py +++ b/src/allmydata/introducer/client.py @@ -50,7 +50,7 @@ class IntroducerClient(service.Service, Referenceable): def __init__(self, tub, introducer_furl, nickname, my_version, oldest_supported, - app_versions): + app_versions, sequencer): self._tub = tub self.introducer_furl = introducer_furl @@ -59,6 +59,7 @@ class IntroducerClient(service.Service, Referenceable): self._my_version = my_version self._oldest_supported = oldest_supported self._app_versions = app_versions + self._sequencer = sequencer self._my_subscriber_info = { "version": 0, "nickname": self._nickname, @@ -69,7 +70,8 @@ class IntroducerClient(service.Service, Referenceable): self._stub_client = None # for_v1 self._stub_client_furl = None - self._published_announcements = {} + self._outbound_announcements = {} # not signed + self._published_announcements = {} # signed self._canary = Referenceable() self._publisher = None @@ -78,14 +80,14 @@ class IntroducerClient(service.Service, Referenceable): self._subscribed_service_names = set() self._subscriptions = set() # requests we've actually sent - # _current_announcements remembers one announcement per + # _inbound_announcements remembers one announcement per # (servicename,serverid) pair. Anything that arrives with the same # pair will displace the previous one. This stores tuples of # (unpacked announcement dictionary, verifyingkey, rxtime). The ann # dicts can be compared for equality to distinguish re-announcement # from updates. It also provides memory for clients who subscribe # after startup. - self._current_announcements = {} + self._inbound_announcements = {} self.encoding_parameters = None @@ -154,7 +156,7 @@ class IntroducerClient(service.Service, Referenceable): self._local_subscribers.append( (service_name,cb,args,kwargs) ) self._subscribed_service_names.add(service_name) self._maybe_subscribe() - for index,(ann,key_s,when) in self._current_announcements.items(): + for index,(ann,key_s,when) in self._inbound_announcements.items(): servicename = index[0] if servicename == service_name: eventually(cb, key_s, ann, *args, **kwargs) @@ -201,24 +203,33 @@ class IntroducerClient(service.Service, Referenceable): d.addCallback(_publish_stub_client) return d - def create_announcement(self, service_name, ann, signing_key, _mod=None): - full_ann = { "version": 0, - "seqnum": time.time(), - "nickname": self._nickname, - "app-versions": self._app_versions, - "my-version": self._my_version, - "oldest-supported": self._oldest_supported, - - "service-name": service_name, - } - full_ann.update(ann) - if _mod: - full_ann = _mod(full_ann) # for unit tests - return sign_to_foolscap(full_ann, signing_key) + def create_announcement_dict(self, service_name, ann): + ann_d = { "version": 0, + # "seqnum" and "nonce" will be populated with new values in + # publish(), each time we make a change + "nickname": self._nickname, + "app-versions": self._app_versions, + "my-version": self._my_version, + "oldest-supported": self._oldest_supported, + + "service-name": service_name, + } + ann_d.update(ann) + return ann_d def publish(self, service_name, ann, signing_key=None): - ann_t = self.create_announcement(service_name, ann, signing_key) - self._published_announcements[service_name] = ann_t + # we increment the seqnum every time we publish something new + current_seqnum, current_nonce = self._sequencer() + + ann_d = self.create_announcement_dict(service_name, ann) + self._outbound_announcements[service_name] = ann_d + + # publish all announcements with the new seqnum and nonce + for service_name,ann_d in self._outbound_announcements.items(): + ann_d["seqnum"] = current_seqnum + ann_d["nonce"] = current_nonce + ann_t = sign_to_foolscap(ann_d, signing_key) + self._published_announcements[service_name] = ann_t self._maybe_publish() def _maybe_publish(self): @@ -299,8 +310,8 @@ class IntroducerClient(service.Service, Referenceable): index = make_index(ann, key_s) # is this announcement a duplicate? - if (index in self._current_announcements - and self._current_announcements[index][0] == ann): + if (index in self._inbound_announcements + and self._inbound_announcements[index][0] == ann): self.log(format="reannouncement for [%(service)s]:%(description)s, ignoring", service=service_name, description=description, parent=lp2, level=log.UNUSUAL, umid="B1MIdA") @@ -308,12 +319,13 @@ class IntroducerClient(service.Service, Referenceable): return # does it update an existing one? - if index in self._current_announcements: - old,_,_ = self._current_announcements[index] + if index in self._inbound_announcements: + old,_,_ = self._inbound_announcements[index] if "seqnum" in old: # must beat previous sequence number to replace - if "seqnum" not in ann: - self.log("not replacing old announcement, no seqnum: %s" + if ("seqnum" not in ann + or not isinstance(ann["seqnum"], (int,long))): + self.log("not replacing old announcement, no valid seqnum: %s" % (ann,), parent=lp2, level=log.NOISY, umid="zFGH3Q") return @@ -335,7 +347,7 @@ class IntroducerClient(service.Service, Referenceable): self.log("new announcement[%s]" % service_name, parent=lp2, level=log.NOISY) - self._current_announcements[index] = (ann, key_s, time.time()) + self._inbound_announcements[index] = (ann, key_s, time.time()) # note: we never forget an index, but we might update its value for (service_name2,cb,args,kwargs) in self._local_subscribers: diff --git a/src/allmydata/introducer/server.py b/src/allmydata/introducer/server.py index 1a4b7680..adc68af3 100644 --- a/src/allmydata/introducer/server.py +++ b/src/allmydata/introducer/server.py @@ -220,8 +220,9 @@ class IntroducerService(service.MultiService, Referenceable): else: if "seqnum" in old_ann: # must beat previous sequence number to replace - if "seqnum" not in ann: - self.log("not replacing old ann, no seqnum", + if ("seqnum" not in ann + or not isinstance(ann["seqnum"], (int,long))): + self.log("not replacing old ann, no valid seqnum", level=log.NOISY, umid="ySbaVw") self._debug_counts["inbound_no_seqnum"] += 1 return diff --git a/src/allmydata/test/test_introducer.py b/src/allmydata/test/test_introducer.py index e8eca502..0127d282 100644 --- a/src/allmydata/test/test_introducer.py +++ b/src/allmydata/test/test_introducer.py @@ -1,5 +1,5 @@ -import os, re +import os, re, itertools from base64 import b32decode import simplejson @@ -20,7 +20,8 @@ from allmydata.introducer import old # test compatibility with old introducer .tac files from allmydata.introducer import IntroducerNode from allmydata.web import introweb -from allmydata.util import pollmixin, keyutil, idlib +from allmydata.client import Client as TahoeClient +from allmydata.util import pollmixin, keyutil, idlib, fileutil import allmydata.test.common_util as testutil class LoggingMultiService(service.MultiService): @@ -54,7 +55,7 @@ class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin): def test_create(self): ic = IntroducerClient(None, "introducer.furl", u"my_nickname", - "my_version", "oldest_version", {}) + "my_version", "oldest_version", {}, fakeseq) self.failUnless(isinstance(ic, IntroducerClient)) def test_listen(self): @@ -86,12 +87,12 @@ class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin): i = IntroducerService() ic = IntroducerClient(None, "introducer.furl", u"my_nickname", - "my_version", "oldest_version", {}) + "my_version", "oldest_version", {}, fakeseq) sk_s, vk_s = keyutil.make_keypair() sk, _ignored = keyutil.parse_privkey(sk_s) keyid = keyutil.remove_prefix(vk_s, "pub-v0-") furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short") - ann_t = ic.create_announcement("storage", make_ann(furl1), sk) + ann_t = make_ann_t(ic, furl1, sk, 1) i.remote_publish_v2(ann_t, Referenceable()) announcements = i.get_announcements() self.failUnlessEqual(len(announcements), 1) @@ -112,24 +113,30 @@ class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin): self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2) +def fakeseq(): + return 1, "nonce" + +seqnum_counter = itertools.count(1) +def realseq(): + return seqnum_counter.next(), str(os.randint(1,100000)) + def make_ann(furl): ann = { "anonymous-storage-FURL": furl, "permutation-seed-base32": get_tubid_string(furl) } return ann def make_ann_t(ic, furl, privkey, seqnum): - def mod(ann): - ann["seqnum"] = seqnum - if seqnum is None: - del ann["seqnum"] - return ann - return ic.create_announcement("storage", make_ann(furl), privkey, mod) + ann_d = ic.create_announcement_dict("storage", make_ann(furl)) + ann_d["seqnum"] = seqnum + ann_d["nonce"] = "nonce" + ann_t = sign_to_foolscap(ann_d, privkey) + return ann_t class Client(unittest.TestCase): def test_duplicate_receive_v1(self): ic = IntroducerClient(None, "introducer.furl", u"my_nickname", - "my_version", "oldest_version", {}) + "my_version", "oldest_version", {}, fakeseq) announcements = [] ic.subscribe_to("storage", lambda key_s,ann: announcements.append(ann)) @@ -178,12 +185,12 @@ class Client(unittest.TestCase): def test_duplicate_receive_v2(self): ic1 = IntroducerClient(None, "introducer.furl", u"my_nickname", - "ver23", "oldest_version", {}) + "ver23", "oldest_version", {}, fakeseq) # we use a second client just to create a different-looking # announcement ic2 = IntroducerClient(None, "introducer.furl", u"my_nickname", - "ver24","oldest_version",{}) + "ver24","oldest_version",{}, fakeseq) announcements = [] def _received(key_s, ann): announcements.append( (key_s, ann) ) @@ -286,7 +293,7 @@ class Client(unittest.TestCase): # not replace the other) ic = IntroducerClient(None, "introducer.furl", u"my_nickname", - "my_version", "oldest_version", {}) + "my_version", "oldest_version", {}, fakeseq) announcements = [] ic.subscribe_to("storage", lambda key_s,ann: announcements.append(ann)) @@ -295,7 +302,7 @@ class Client(unittest.TestCase): keyid = keyutil.remove_prefix(vk_s, "pub-v0-") furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short") furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid - ann_t = ic.create_announcement("storage", make_ann(furl1), sk) + ann_t = make_ann_t(ic, furl1, sk, 1) ic.remote_announce_v2([ann_t]) d = fireEventually() def _then(ign): @@ -324,7 +331,7 @@ class Server(unittest.TestCase): i = IntroducerService() ic1 = IntroducerClient(None, "introducer.furl", u"my_nickname", - "ver23", "oldest_version", {}) + "ver23", "oldest_version", {}, realseq) furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp" privkey_s, _ = keyutil.make_keypair() @@ -334,6 +341,7 @@ class Server(unittest.TestCase): ann1_old = make_ann_t(ic1, furl1, privkey, seqnum=9) ann1_new = make_ann_t(ic1, furl1, privkey, seqnum=11) ann1_noseqnum = make_ann_t(ic1, furl1, privkey, seqnum=None) + ann1_badseqnum = make_ann_t(ic1, furl1, privkey, seqnum="not an int") i.remote_publish_v2(ann1, None) all = i.get_announcements() @@ -385,6 +393,16 @@ class Server(unittest.TestCase): self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1) self.failUnlessEqual(i._debug_counts["inbound_update"], 1) + i.remote_publish_v2(ann1_badseqnum, None) + all = i.get_announcements() + self.failUnlessEqual(len(all), 1) + self.failUnlessEqual(all[0].announcement["seqnum"], 11) + self.failUnlessEqual(i._debug_counts["inbound_message"], 6) + self.failUnlessEqual(i._debug_counts["inbound_duplicate"], 1) + self.failUnlessEqual(i._debug_counts["inbound_no_seqnum"], 2) + self.failUnlessEqual(i._debug_counts["inbound_old_replay"], 1) + self.failUnlessEqual(i._debug_counts["inbound_update"], 1) + NICKNAME = u"n\u00EDickname-%s" # LATIN SMALL LETTER I WITH ACUTE @@ -415,7 +433,7 @@ class Queue(SystemTestMixin, unittest.TestCase): tub2 = Tub() tub2.setServiceParent(self.parent) c = IntroducerClient(tub2, ifurl, - u"nickname", "version", "oldest", {}) + u"nickname", "version", "oldest", {}, fakeseq) furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short") sk_s, vk_s = keyutil.make_keypair() sk, _ignored = keyutil.parse_privkey(sk_s) @@ -503,7 +521,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase): c = IntroducerClient(tub, self.introducer_furl, NICKNAME % str(i), "version", "oldest", - {"component": "component-v1"}) + {"component": "component-v1"}, fakeseq) received_announcements[c] = {} def got(key_s_or_tubid, ann, announcements, i): if i == 0: @@ -822,7 +840,8 @@ class ClientInfo(unittest.TestCase): tub = introducer_furl = None app_versions = {"whizzy": "fizzy"} client_v2 = IntroducerClient(tub, introducer_furl, NICKNAME % u"v2", - "my_version", "oldest", app_versions) + "my_version", "oldest", app_versions, + fakeseq) #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum" #ann_s = make_ann_t(client_v2, furl1, None, 10) #introducer.remote_publish_v2(ann_s, Referenceable()) @@ -883,10 +902,11 @@ class Announcements(unittest.TestCase): tub = introducer_furl = None app_versions = {"whizzy": "fizzy"} client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2", - "my_version", "oldest", app_versions) + "my_version", "oldest", app_versions, + fakeseq) furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum" tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y" - ann_s0 = make_ann_t(client_v2, furl1, None, 10.0) + ann_s0 = make_ann_t(client_v2, furl1, None, 10) canary0 = Referenceable() introducer.remote_publish_v2(ann_s0, canary0) a = introducer.get_announcements() @@ -904,12 +924,13 @@ class Announcements(unittest.TestCase): tub = introducer_furl = None app_versions = {"whizzy": "fizzy"} client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2", - "my_version", "oldest", app_versions) + "my_version", "oldest", app_versions, + fakeseq) furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum" sk_s, vk_s = keyutil.make_keypair() sk, _ignored = keyutil.parse_privkey(sk_s) pks = keyutil.remove_prefix(vk_s, "pub-") - ann_t0 = make_ann_t(client_v2, furl1, sk, 10.0) + ann_t0 = make_ann_t(client_v2, furl1, sk, 10) canary0 = Referenceable() introducer.remote_publish_v2(ann_t0, canary0) a = introducer.get_announcements() @@ -941,6 +962,51 @@ class Announcements(unittest.TestCase): self.failUnlessEqual(a[0].version, "my_version") self.failUnlessEqual(a[0].announcement["anonymous-storage-FURL"], furl1) +class ClientSeqnums(unittest.TestCase): + def test_client(self): + basedir = "introducer/ClientSeqnums/test_client" + fileutil.make_dirs(basedir) + f = open(os.path.join(basedir, "tahoe.cfg"), "w") + f.write("[client]\n") + f.write("introducer.furl = nope\n") + f.close() + c = TahoeClient(basedir) + ic = c.introducer_client + outbound = ic._outbound_announcements + published = ic._published_announcements + def read_seqnum(): + f = open(os.path.join(basedir, "announcement-seqnum")) + seqnum = f.read().strip() + f.close() + return int(seqnum) + + ic.publish("sA", {"key": "value1"}, c._server_key) + self.failUnlessEqual(read_seqnum(), 1) + self.failUnless("sA" in outbound) + self.failUnlessEqual(outbound["sA"]["seqnum"], 1) + nonce1 = outbound["sA"]["nonce"] + self.failUnless(isinstance(nonce1, str)) + self.failUnlessEqual(simplejson.loads(published["sA"][0]), + outbound["sA"]) + # [1] is the signature, [2] is the pubkey + + # publishing a second service causes both services to be + # re-published, with the next higher sequence number + ic.publish("sB", {"key": "value2"}, c._server_key) + self.failUnlessEqual(read_seqnum(), 2) + self.failUnless("sB" in outbound) + self.failUnlessEqual(outbound["sB"]["seqnum"], 2) + self.failUnless("sA" in outbound) + self.failUnlessEqual(outbound["sA"]["seqnum"], 2) + nonce2 = outbound["sA"]["nonce"] + self.failUnless(isinstance(nonce2, str)) + self.failIfEqual(nonce1, nonce2) + self.failUnlessEqual(simplejson.loads(published["sA"][0]), + outbound["sA"]) + self.failUnlessEqual(simplejson.loads(published["sB"][0]), + outbound["sB"]) + + class TooNewServer(IntroducerService): VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999": @@ -969,7 +1035,8 @@ class NonV1Server(SystemTestMixin, unittest.TestCase): tub.setLocation("localhost:%d" % portnum) c = IntroducerClient(tub, self.introducer_furl, - u"nickname-client", "version", "oldest", {}) + u"nickname-client", "version", "oldest", {}, + fakeseq) announcements = {} def got(key_s, ann): announcements[key_s] = ann