3 from zope.interface import implements
4 from twisted.application import service
5 from foolscap.api import Referenceable
7 from allmydata import node
8 from allmydata.util import log, rrefutil
9 from allmydata.introducer.interfaces import \
10 RIIntroducerPublisherAndSubscriberService_v2
11 from allmydata.introducer.common import convert_announcement_v1_to_v2, \
12 convert_announcement_v2_to_v1, unsign_from_foolscap, make_index, \
13 get_tubid_string_from_ann, SubscriberDescriptor, AnnouncementDescriptor
15 class IntroducerNode(node.Node):
16 PORTNUMFILE = "introducer.port"
17 NODETYPE = "introducer"
18 GENERATED_FILES = ['introducer.furl']
20 def __init__(self, basedir="."):
21 node.Node.__init__(self, basedir)
23 self.init_introducer()
24 webport = self.get_config("node", "web.port", None)
26 self.init_web(webport) # strports string
28 def init_introducer(self):
29 introducerservice = IntroducerService(self.basedir)
30 self.add_service(introducerservice)
32 d = self.when_tub_ready()
34 self.introducer_url = self.tub.registerReference(introducerservice,
36 self.log(" introducer is at %s" % self.introducer_url,
38 self.write_config("introducer.furl", self.introducer_url + "\n")
39 d.addCallback(_publish)
40 d.addErrback(log.err, facility="tahoe.init",
41 level=log.BAD, umid="UaNs9A")
43 def init_web(self, webport):
44 self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA")
46 from allmydata.webish import IntroducerWebishServer
47 nodeurl_path = os.path.join(self.basedir, "node.url")
48 staticdir = self.get_config("node", "web.static", "public_html")
49 staticdir = os.path.expanduser(staticdir)
50 ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir)
53 class WrapV1SubscriberInV2Interface: # for_v1
54 """I wrap a RemoteReference that points at an old v1 subscriber, enabling
55 it to be treated like a v2 subscriber.
58 def __init__(self, original):
59 self.original = original # also used for tests
60 def __eq__(self, them):
61 return self.original == them
62 def __ne__(self, them):
63 return self.original != them
65 return hash(self.original)
66 def getRemoteTubID(self):
67 return self.original.getRemoteTubID()
68 def getSturdyRef(self):
69 return self.original.getSturdyRef()
71 return self.original.getPeer()
72 def getLocationHints(self):
73 return self.original.getLocationHints()
74 def callRemote(self, methname, *args, **kwargs):
75 m = getattr(self, "wrap_" + methname)
76 return m(*args, **kwargs)
77 def wrap_announce_v2(self, announcements):
78 anns_v1 = [convert_announcement_v2_to_v1(ann) for ann in announcements]
79 return self.original.callRemote("announce", set(anns_v1))
80 def wrap_set_encoding_parameters(self, parameters):
82 return self.original.callRemote("set_encoding_parameters", parameters)
83 def notifyOnDisconnect(self, *args, **kwargs):
84 return self.original.notifyOnDisconnect(*args, **kwargs)
86 class IntroducerService(service.MultiService, Referenceable):
87 implements(RIIntroducerPublisherAndSubscriberService_v2)
89 # v1 is the original protocol, supported since 1.0 (but only advertised
90 # starting in 1.3). v2 is the new signed protocol, supported after 1.9
91 VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": { },
92 "http://allmydata.org/tahoe/protocols/introducer/v2": { },
93 "application-version": str(allmydata.__full_version__),
96 def __init__(self, basedir="."):
97 service.MultiService.__init__(self)
98 self.introducer_url = None
99 # 'index' is (service_name, key_s, tubid), where key_s or tubid is
101 self._announcements = {} # dict of index ->
102 # (ann_t, canary, ann, timestamp)
104 # ann (the announcement dictionary) is cleaned up: nickname is always
105 # unicode, servicename is always ascii, etc, even though
106 # simplejson.loads sometimes returns either
108 # self._subscribers is a dict mapping servicename to subscriptions
109 # 'subscriptions' is a dict mapping rref to a subscription
110 # 'subscription' is a tuple of (subscriber_info, timestamp)
111 # 'subscriber_info' is a dict, provided directly for v2 clients, or
112 # synthesized for v1 clients. The expected keys are:
113 # version, nickname, app-versions, my-version, oldest-supported
114 self._subscribers = {}
116 # self._stub_client_announcements contains the information provided
117 # by v1 clients. We stash this so we can match it up with their
119 self._stub_client_announcements = {} # maps tubid to sinfo # for_v1
121 self._debug_counts = {"inbound_message": 0,
122 "inbound_duplicate": 0,
123 "inbound_no_seqnum": 0,
124 "inbound_old_replay": 0,
126 "outbound_message": 0,
127 "outbound_announcements": 0,
128 "inbound_subscribe": 0}
129 self._debug_outstanding = 0 # also covers WrapV1SubscriberInV2Interface
131 def _debug_retired(self, res):
132 self._debug_outstanding -= 1
135 def log(self, *args, **kwargs):
136 if "facility" not in kwargs:
137 kwargs["facility"] = "tahoe.introducer.server"
138 return log.msg(*args, **kwargs)
140 def get_announcements(self, include_stub_clients=True):
141 """Return a list of AnnouncementDescriptor for all announcements"""
143 for (index, (_, canary, ann, when)) in self._announcements.items():
144 if ann["service-name"] == "stub_client":
145 if not include_stub_clients:
147 ad = AnnouncementDescriptor(when, index, canary, ann)
148 announcements.append(ad)
151 def get_subscribers(self):
152 """Return a list of SubscriberDescriptor objects for all subscribers"""
154 for service_name, subscriptions in self._subscribers.items():
155 for rref,(subscriber_info,when) in subscriptions.items():
156 # note that if the subscriber didn't do Tub.setLocation,
157 # tubid will be None. Also, subscribers do not tell us which
158 # pubkey they use; only publishers do that.
159 tubid = rref.getRemoteTubID() or "?"
160 advertised_addresses = rrefutil.hosts_for_rref(rref)
161 remote_address = rrefutil.stringify_remote_address(rref)
162 # these three assume subscriber_info["version"]==0, but
163 # should tolerate other versions
164 if not subscriber_info:
165 # V1 clients that haven't yet sent their stub_info data
167 nickname = subscriber_info.get("nickname", u"?")
168 version = subscriber_info.get("my-version", u"?")
169 app_versions = subscriber_info.get("app-versions", {})
170 # 'when' is the time they subscribed
171 sd = SubscriberDescriptor(service_name, when,
172 nickname, version, app_versions,
173 advertised_addresses, remote_address,
178 def remote_get_version(self):
181 def remote_publish(self, ann_t): # for_v1
182 lp = self.log("introducer: old (v1) announcement published: %s"
183 % (ann_t,), umid="6zGOIw")
184 ann_v2 = convert_announcement_v1_to_v2(ann_t)
185 return self.publish(ann_v2, None, lp)
187 def remote_publish_v2(self, ann_t, canary):
188 lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ")
189 return self.publish(ann_t, canary, lp)
191 def publish(self, ann_t, canary, lp):
193 self._publish(ann_t, canary, lp)
195 log.err(format="Introducer.remote_publish failed on %(ann)s",
197 level=log.UNUSUAL, parent=lp, umid="620rWA")
200 def _publish(self, ann_t, canary, lp):
201 self._debug_counts["inbound_message"] += 1
202 self.log("introducer: announcement published: %s" % (ann_t,),
204 ann, key = unsign_from_foolscap(ann_t) # might raise BadSignatureError
205 index = make_index(ann, key)
207 service_name = str(ann["service-name"])
208 if service_name == "stub_client": # for_v1
209 self._attach_stub_client(ann, lp)
212 old = self._announcements.get(index)
214 (old_ann_t, canary, old_ann, timestamp) = old
216 self.log("but we already knew it, ignoring", level=log.NOISY,
218 self._debug_counts["inbound_duplicate"] += 1
221 if "seqnum" in old_ann:
222 # must beat previous sequence number to replace
223 if "seqnum" not in ann:
224 self.log("not replacing old ann, no seqnum",
225 level=log.NOISY, umid="ySbaVw")
226 self._debug_counts["inbound_no_seqnum"] += 1
228 if ann["seqnum"] <= old_ann["seqnum"]:
229 self.log("not replacing old ann, new seqnum is too old"
230 " (%s <= %s) (replay attack?)"
231 % (ann["seqnum"], old_ann["seqnum"]),
232 level=log.UNUSUAL, umid="sX7yqQ")
233 self._debug_counts["inbound_old_replay"] += 1
235 # ok, seqnum is newer, allow replacement
236 self.log("old announcement being updated", level=log.NOISY,
238 self._debug_counts["inbound_update"] += 1
239 self._announcements[index] = (ann_t, canary, ann, time.time())
241 # canary.notifyOnDisconnect ...
242 # use a CanaryWatcher? with cw.is_connected()?
243 # actually we just want foolscap to give rref.is_connected(), since
244 # this is only for the status display
246 for s in self._subscribers.get(service_name, []):
247 self._debug_counts["outbound_message"] += 1
248 self._debug_counts["outbound_announcements"] += 1
249 self._debug_outstanding += 1
250 d = s.callRemote("announce_v2", set([ann_t]))
251 d.addBoth(self._debug_retired)
252 d.addErrback(log.err,
253 format="subscriber errored on announcement %(ann)s",
254 ann=ann_t, facility="tahoe.introducer",
255 level=log.UNUSUAL, umid="jfGMXQ")
257 def _attach_stub_client(self, ann, lp):
258 # There might be a v1 subscriber for whom this is a stub_client.
259 # We might have received the subscription before the stub_client
260 # announcement, in which case we now need to fix up the record in
261 # self._subscriptions .
263 # record it for later, in case the stub_client arrived before the
265 subscriber_info = self._get_subscriber_info_from_ann(ann)
266 ann_tubid = get_tubid_string_from_ann(ann)
267 self._stub_client_announcements[ann_tubid] = subscriber_info
269 lp2 = self.log("stub_client announcement, "
270 "looking for matching subscriber",
271 parent=lp, level=log.NOISY, umid="BTywDg")
273 for sn in self._subscribers:
274 s = self._subscribers[sn]
275 for (subscriber, info) in s.items():
276 # we correlate these by looking for a subscriber whose tubid
277 # matches this announcement
278 sub_tubid = subscriber.getRemoteTubID()
279 if sub_tubid == ann_tubid:
280 self.log(format="found a match, nodeid=%(nodeid)s",
282 level=log.NOISY, parent=lp2, umid="xsWs1A")
283 # found a match. Does it need info?
285 self.log(format="replacing info",
286 level=log.NOISY, parent=lp2, umid="m5kxwA")
288 s[subscriber] = (subscriber_info, info[1])
289 # and we don't remember or announce stub_clients beyond what we
290 # need to get the subscriber_info set up
292 def _get_subscriber_info_from_ann(self, ann): # for_v1
293 sinfo = { "version": ann["version"],
294 "nickname": ann["nickname"],
295 "app-versions": ann["app-versions"],
296 "my-version": ann["my-version"],
297 "oldest-supported": ann["oldest-supported"],
301 def remote_subscribe(self, subscriber, service_name): # for_v1
302 self.log("introducer: old (v1) subscription[%s] request at %s"
303 % (service_name, subscriber), umid="hJlGUg")
304 return self.add_subscriber(WrapV1SubscriberInV2Interface(subscriber),
307 def remote_subscribe_v2(self, subscriber, service_name, subscriber_info):
308 self.log("introducer: subscription[%s] request at %s"
309 % (service_name, subscriber), umid="U3uzLg")
310 return self.add_subscriber(subscriber, service_name, subscriber_info)
312 def add_subscriber(self, subscriber, service_name, subscriber_info):
313 self._debug_counts["inbound_subscribe"] += 1
314 if service_name not in self._subscribers:
315 self._subscribers[service_name] = {}
316 subscribers = self._subscribers[service_name]
317 if subscriber in subscribers:
318 self.log("but they're already subscribed, ignoring",
319 level=log.UNUSUAL, umid="Sy9EfA")
322 if not subscriber_info: # for_v1
323 # v1 clients don't provide subscriber_info, but they should
324 # publish a 'stub client' record which contains the same
325 # information. If we've already received this, it will be in
326 # self._stub_client_announcements
327 tubid = subscriber.getRemoteTubID()
328 if tubid in self._stub_client_announcements:
329 subscriber_info = self._stub_client_announcements[tubid]
331 subscribers[subscriber] = (subscriber_info, time.time())
333 self.log("introducer: unsubscribing[%s] %s" % (service_name,
336 subscribers.pop(subscriber, None)
337 subscriber.notifyOnDisconnect(_remove)
339 # now tell them about any announcements they're interested in
340 announcements = set( [ ann_t
341 for idx,(ann_t,canary,ann,when)
342 in self._announcements.items()
343 if idx[0] == service_name] )
345 self._debug_counts["outbound_message"] += 1
346 self._debug_counts["outbound_announcements"] += len(announcements)
347 self._debug_outstanding += 1
348 d = subscriber.callRemote("announce_v2", announcements)
349 d.addBoth(self._debug_retired)
350 d.addErrback(log.err,
351 format="subscriber errored during subscribe %(anns)s",
352 anns=announcements, facility="tahoe.introducer",
353 level=log.UNUSUAL, umid="mtZepQ")