]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/introducer/client.py
introducer.client: use integer seqnums, not time-based. Closes #1767.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / introducer / client.py
index 33ce5ad86b3c71fcb409ce85bb20e71afec38f6a..f463c1202c0f084f426939153cb92b45959b135e 100644 (file)
@@ -50,7 +50,7 @@ class IntroducerClient(service.Service, Referenceable):
 
     def __init__(self, tub, introducer_furl,
                  nickname, my_version, oldest_supported,
-                 app_versions):
+                 app_versions, sequencer):
         self._tub = tub
         self.introducer_furl = introducer_furl
 
@@ -59,6 +59,7 @@ class IntroducerClient(service.Service, Referenceable):
         self._my_version = my_version
         self._oldest_supported = oldest_supported
         self._app_versions = app_versions
+        self._sequencer = sequencer
 
         self._my_subscriber_info = { "version": 0,
                                      "nickname": self._nickname,
@@ -69,7 +70,8 @@ class IntroducerClient(service.Service, Referenceable):
         self._stub_client = None # for_v1
         self._stub_client_furl = None
 
-        self._published_announcements = {}
+        self._outbound_announcements = {} # not signed
+        self._published_announcements = {} # signed
         self._canary = Referenceable()
 
         self._publisher = None
@@ -78,14 +80,14 @@ class IntroducerClient(service.Service, Referenceable):
         self._subscribed_service_names = set()
         self._subscriptions = set() # requests we've actually sent
 
-        # _current_announcements remembers one announcement per
+        # _inbound_announcements remembers one announcement per
         # (servicename,serverid) pair. Anything that arrives with the same
         # pair will displace the previous one. This stores tuples of
         # (unpacked announcement dictionary, verifyingkey, rxtime). The ann
         # dicts can be compared for equality to distinguish re-announcement
         # from updates. It also provides memory for clients who subscribe
         # after startup.
-        self._current_announcements = {}
+        self._inbound_announcements = {}
 
         self.encoding_parameters = None
 
@@ -154,7 +156,7 @@ class IntroducerClient(service.Service, Referenceable):
         self._local_subscribers.append( (service_name,cb,args,kwargs) )
         self._subscribed_service_names.add(service_name)
         self._maybe_subscribe()
-        for index,(ann,key_s,when) in self._current_announcements.items():
+        for index,(ann,key_s,when) in self._inbound_announcements.items():
             servicename = index[0]
             if servicename == service_name:
                 eventually(cb, key_s, ann, *args, **kwargs)
@@ -201,24 +203,33 @@ class IntroducerClient(service.Service, Referenceable):
         d.addCallback(_publish_stub_client)
         return d
 
-    def create_announcement(self, service_name, ann, signing_key, _mod=None):
-        full_ann = { "version": 0,
-                     "seqnum": time.time(),
-                     "nickname": self._nickname,
-                     "app-versions": self._app_versions,
-                     "my-version": self._my_version,
-                     "oldest-supported": self._oldest_supported,
-
-                     "service-name": service_name,
-                     }
-        full_ann.update(ann)
-        if _mod:
-            full_ann = _mod(full_ann) # for unit tests
-        return sign_to_foolscap(full_ann, signing_key)
+    def create_announcement_dict(self, service_name, ann):
+        ann_d = { "version": 0,
+                  # "seqnum" and "nonce" will be populated with new values in
+                  # publish(), each time we make a change
+                  "nickname": self._nickname,
+                  "app-versions": self._app_versions,
+                  "my-version": self._my_version,
+                  "oldest-supported": self._oldest_supported,
+
+                  "service-name": service_name,
+                  }
+        ann_d.update(ann)
+        return ann_d
 
     def publish(self, service_name, ann, signing_key=None):
-        ann_t = self.create_announcement(service_name, ann, signing_key)
-        self._published_announcements[service_name] = ann_t
+        # we increment the seqnum every time we publish something new
+        current_seqnum, current_nonce = self._sequencer()
+
+        ann_d = self.create_announcement_dict(service_name, ann)
+        self._outbound_announcements[service_name] = ann_d
+
+        # publish all announcements with the new seqnum and nonce
+        for service_name,ann_d in self._outbound_announcements.items():
+            ann_d["seqnum"] = current_seqnum
+            ann_d["nonce"] = current_nonce
+            ann_t = sign_to_foolscap(ann_d, signing_key)
+            self._published_announcements[service_name] = ann_t
         self._maybe_publish()
 
     def _maybe_publish(self):
@@ -299,8 +310,8 @@ class IntroducerClient(service.Service, Referenceable):
         index = make_index(ann, key_s)
 
         # is this announcement a duplicate?
-        if (index in self._current_announcements
-            and self._current_announcements[index][0] == ann):
+        if (index in self._inbound_announcements
+            and self._inbound_announcements[index][0] == ann):
             self.log(format="reannouncement for [%(service)s]:%(description)s, ignoring",
                      service=service_name, description=description,
                      parent=lp2, level=log.UNUSUAL, umid="B1MIdA")
@@ -308,12 +319,13 @@ class IntroducerClient(service.Service, Referenceable):
             return
 
         # does it update an existing one?
-        if index in self._current_announcements:
-            old,_,_ = self._current_announcements[index]
+        if index in self._inbound_announcements:
+            old,_,_ = self._inbound_announcements[index]
             if "seqnum" in old:
                 # must beat previous sequence number to replace
-                if "seqnum" not in ann:
-                    self.log("not replacing old announcement, no seqnum: %s"
+                if ("seqnum" not in ann
+                    or not isinstance(ann["seqnum"], (int,long))):
+                    self.log("not replacing old announcement, no valid seqnum: %s"
                              % (ann,),
                              parent=lp2, level=log.NOISY, umid="zFGH3Q")
                     return
@@ -335,7 +347,7 @@ class IntroducerClient(service.Service, Referenceable):
             self.log("new announcement[%s]" % service_name,
                      parent=lp2, level=log.NOISY)
 
-        self._current_announcements[index] = (ann, key_s, time.time())
+        self._inbound_announcements[index] = (ann, key_s, time.time())
         # note: we never forget an index, but we might update its value
 
         for (service_name2,cb,args,kwargs) in self._local_subscribers: