def __init__(self, tub, introducer_furl,
nickname, my_version, oldest_supported,
- app_versions):
+ app_versions, sequencer):
self._tub = tub
self.introducer_furl = introducer_furl
self._my_version = my_version
self._oldest_supported = oldest_supported
self._app_versions = app_versions
+ self._sequencer = sequencer
self._my_subscriber_info = { "version": 0,
"nickname": self._nickname,
self._stub_client = None # for_v1
self._stub_client_furl = None
- self._published_announcements = {}
+ self._outbound_announcements = {} # not signed
+ self._published_announcements = {} # signed
self._canary = Referenceable()
self._publisher = None
self._subscribed_service_names = set()
self._subscriptions = set() # requests we've actually sent
- # _current_announcements remembers one announcement per
+ # _inbound_announcements remembers one announcement per
# (servicename,serverid) pair. Anything that arrives with the same
# pair will displace the previous one. This stores tuples of
# (unpacked announcement dictionary, verifyingkey, rxtime). The ann
# dicts can be compared for equality to distinguish re-announcement
# from updates. It also provides memory for clients who subscribe
# after startup.
- self._current_announcements = {}
+ self._inbound_announcements = {}
self.encoding_parameters = None
self._local_subscribers.append( (service_name,cb,args,kwargs) )
self._subscribed_service_names.add(service_name)
self._maybe_subscribe()
- for index,(ann,key_s,when) in self._current_announcements.items():
+ for index,(ann,key_s,when) in self._inbound_announcements.items():
servicename = index[0]
if servicename == service_name:
eventually(cb, key_s, ann, *args, **kwargs)
d.addCallback(_publish_stub_client)
return d
- def create_announcement(self, service_name, ann, signing_key, _mod=None):
- full_ann = { "version": 0,
- "seqnum": time.time(),
- "nickname": self._nickname,
- "app-versions": self._app_versions,
- "my-version": self._my_version,
- "oldest-supported": self._oldest_supported,
-
- "service-name": service_name,
- }
- full_ann.update(ann)
- if _mod:
- full_ann = _mod(full_ann) # for unit tests
- return sign_to_foolscap(full_ann, signing_key)
+ def create_announcement_dict(self, service_name, ann):
+ ann_d = { "version": 0,
+ # "seqnum" and "nonce" will be populated with new values in
+ # publish(), each time we make a change
+ "nickname": self._nickname,
+ "app-versions": self._app_versions,
+ "my-version": self._my_version,
+ "oldest-supported": self._oldest_supported,
+
+ "service-name": service_name,
+ }
+ ann_d.update(ann)
+ return ann_d
def publish(self, service_name, ann, signing_key=None):
- ann_t = self.create_announcement(service_name, ann, signing_key)
- self._published_announcements[service_name] = ann_t
+ # we increment the seqnum every time we publish something new
+ current_seqnum, current_nonce = self._sequencer()
+
+ ann_d = self.create_announcement_dict(service_name, ann)
+ self._outbound_announcements[service_name] = ann_d
+
+ # publish all announcements with the new seqnum and nonce
+ for service_name,ann_d in self._outbound_announcements.items():
+ ann_d["seqnum"] = current_seqnum
+ ann_d["nonce"] = current_nonce
+ ann_t = sign_to_foolscap(ann_d, signing_key)
+ self._published_announcements[service_name] = ann_t
self._maybe_publish()
def _maybe_publish(self):
index = make_index(ann, key_s)
# is this announcement a duplicate?
- if (index in self._current_announcements
- and self._current_announcements[index][0] == ann):
+ if (index in self._inbound_announcements
+ and self._inbound_announcements[index][0] == ann):
self.log(format="reannouncement for [%(service)s]:%(description)s, ignoring",
service=service_name, description=description,
parent=lp2, level=log.UNUSUAL, umid="B1MIdA")
return
# does it update an existing one?
- if index in self._current_announcements:
- old,_,_ = self._current_announcements[index]
+ if index in self._inbound_announcements:
+ old,_,_ = self._inbound_announcements[index]
if "seqnum" in old:
# must beat previous sequence number to replace
- if "seqnum" not in ann:
- self.log("not replacing old announcement, no seqnum: %s"
+ if ("seqnum" not in ann
+ or not isinstance(ann["seqnum"], (int,long))):
+ self.log("not replacing old announcement, no valid seqnum: %s"
% (ann,),
parent=lp2, level=log.NOISY, umid="zFGH3Q")
return
self.log("new announcement[%s]" % service_name,
parent=lp2, level=log.NOISY)
- self._current_announcements[index] = (ann, key_s, time.time())
+ self._inbound_announcements[index] = (ann, key_s, time.time())
# note: we never forget an index, but we might update its value
for (service_name2,cb,args,kwargs) in self._local_subscribers: