2 import time, os.path, textwrap
3 from zope.interface import implements
4 from twisted.application import service
5 from foolscap.api import Referenceable
7 from allmydata import node
8 from allmydata.util import log, rrefutil
9 from allmydata.util.fileutil import abspath_expanduser_unicode
10 from allmydata.introducer.interfaces import \
11 RIIntroducerPublisherAndSubscriberService_v2
12 from allmydata.introducer.common import convert_announcement_v1_to_v2, \
13 convert_announcement_v2_to_v1, unsign_from_foolscap, make_index, \
14 get_tubid_string_from_ann, SubscriberDescriptor, AnnouncementDescriptor
16 class FurlFileConflictError(Exception):
19 class IntroducerNode(node.Node):
20 PORTNUMFILE = "introducer.port"
21 NODETYPE = "introducer"
22 GENERATED_FILES = ['introducer.furl']
24 def __init__(self, basedir=u"."):
25 node.Node.__init__(self, basedir)
27 self.init_introducer()
28 webport = self.get_config("node", "web.port", None)
30 self.init_web(webport) # strports string
32 def init_introducer(self):
33 introducerservice = IntroducerService(self.basedir)
34 self.add_service(introducerservice)
36 old_public_fn = os.path.join(self.basedir, u"introducer.furl")
37 private_fn = os.path.join(self.basedir, u"private", u"introducer.furl")
39 if os.path.exists(old_public_fn):
40 if os.path.exists(private_fn):
41 msg = """This directory (%s) contains both an old public
42 'introducer.furl' file, and a new-style
43 'private/introducer.furl', so I cannot safely remove the old
44 one. Please make sure your desired FURL is in
45 private/introducer.furl, and remove the public file. If this
46 causes your Introducer's FURL to change, you need to inform
47 all grid members so they can update their tahoe.cfg.
49 raise FurlFileConflictError(textwrap.dedent(msg))
50 os.rename(old_public_fn, private_fn)
51 d = self.when_tub_ready()
53 furl = self.tub.registerReference(introducerservice,
55 self.log(" introducer is at %s" % furl, umid="qF2L9A")
56 self.introducer_url = furl # for tests
57 d.addCallback(_publish)
58 d.addErrback(log.err, facility="tahoe.init",
59 level=log.BAD, umid="UaNs9A")
61 def init_web(self, webport):
62 self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA")
64 from allmydata.webish import IntroducerWebishServer
65 nodeurl_path = os.path.join(self.basedir, u"node.url")
66 config_staticdir = self.get_config("node", "web.static", "public_html").decode('utf-8')
67 staticdir = abspath_expanduser_unicode(config_staticdir, base=self.basedir)
68 ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir)
71 class WrapV1SubscriberInV2Interface: # for_v1
72 """I wrap a RemoteReference that points at an old v1 subscriber, enabling
73 it to be treated like a v2 subscriber.
76 def __init__(self, original):
77 self.original = original # also used for tests
78 def __eq__(self, them):
79 return self.original == them
80 def __ne__(self, them):
81 return self.original != them
83 return hash(self.original)
84 def getRemoteTubID(self):
85 return self.original.getRemoteTubID()
86 def getSturdyRef(self):
87 return self.original.getSturdyRef()
89 return self.original.getPeer()
90 def getLocationHints(self):
91 return self.original.getLocationHints()
92 def callRemote(self, methname, *args, **kwargs):
93 m = getattr(self, "wrap_" + methname)
94 return m(*args, **kwargs)
95 def wrap_announce_v2(self, announcements):
96 anns_v1 = [convert_announcement_v2_to_v1(ann) for ann in announcements]
97 return self.original.callRemote("announce", set(anns_v1))
98 def wrap_set_encoding_parameters(self, parameters):
100 return self.original.callRemote("set_encoding_parameters", parameters)
101 def notifyOnDisconnect(self, *args, **kwargs):
102 return self.original.notifyOnDisconnect(*args, **kwargs)
104 class IntroducerService(service.MultiService, Referenceable):
105 implements(RIIntroducerPublisherAndSubscriberService_v2)
107 # v1 is the original protocol, supported since 1.0 (but only advertised
108 # starting in 1.3). v2 is the new signed protocol, supported after 1.9
109 VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": { },
110 "http://allmydata.org/tahoe/protocols/introducer/v2": { },
111 "application-version": str(allmydata.__full_version__),
114 def __init__(self, basedir="."):
115 service.MultiService.__init__(self)
116 self.introducer_url = None
117 # 'index' is (service_name, key_s, tubid), where key_s or tubid is
119 self._announcements = {} # dict of index ->
120 # (ann_t, canary, ann, timestamp)
122 # ann (the announcement dictionary) is cleaned up: nickname is always
123 # unicode, servicename is always ascii, etc, even though
124 # simplejson.loads sometimes returns either
126 # self._subscribers is a dict mapping servicename to subscriptions
127 # 'subscriptions' is a dict mapping rref to a subscription
128 # 'subscription' is a tuple of (subscriber_info, timestamp)
129 # 'subscriber_info' is a dict, provided directly for v2 clients, or
130 # synthesized for v1 clients. The expected keys are:
131 # version, nickname, app-versions, my-version, oldest-supported
132 self._subscribers = {}
134 # self._stub_client_announcements contains the information provided
135 # by v1 clients. We stash this so we can match it up with their
137 self._stub_client_announcements = {} # maps tubid to sinfo # for_v1
139 self._debug_counts = {"inbound_message": 0,
140 "inbound_duplicate": 0,
141 "inbound_no_seqnum": 0,
142 "inbound_old_replay": 0,
144 "outbound_message": 0,
145 "outbound_announcements": 0,
146 "inbound_subscribe": 0}
147 self._debug_outstanding = 0 # also covers WrapV1SubscriberInV2Interface
149 def _debug_retired(self, res):
150 self._debug_outstanding -= 1
153 def log(self, *args, **kwargs):
154 if "facility" not in kwargs:
155 kwargs["facility"] = "tahoe.introducer.server"
156 return log.msg(*args, **kwargs)
158 def get_announcements(self, include_stub_clients=True):
159 """Return a list of AnnouncementDescriptor for all announcements"""
161 for (index, (_, canary, ann, when)) in self._announcements.items():
162 if ann["service-name"] == "stub_client":
163 if not include_stub_clients:
165 ad = AnnouncementDescriptor(when, index, canary, ann)
166 announcements.append(ad)
169 def get_subscribers(self):
170 """Return a list of SubscriberDescriptor objects for all subscribers"""
172 for service_name, subscriptions in self._subscribers.items():
173 for rref,(subscriber_info,when) in subscriptions.items():
174 # note that if the subscriber didn't do Tub.setLocation,
175 # tubid will be None. Also, subscribers do not tell us which
176 # pubkey they use; only publishers do that.
177 tubid = rref.getRemoteTubID() or "?"
178 remote_address = rrefutil.stringify_remote_address(rref)
179 # these three assume subscriber_info["version"]==0, but
180 # should tolerate other versions
181 if not subscriber_info:
182 # V1 clients that haven't yet sent their stub_info data
184 nickname = subscriber_info.get("nickname", u"?")
185 version = subscriber_info.get("my-version", u"?")
186 app_versions = subscriber_info.get("app-versions", {})
187 # 'when' is the time they subscribed
188 sd = SubscriberDescriptor(service_name, when,
189 nickname, version, app_versions,
190 remote_address, tubid)
194 def remote_get_version(self):
197 def remote_publish(self, ann_t): # for_v1
198 lp = self.log("introducer: old (v1) announcement published: %s"
199 % (ann_t,), umid="6zGOIw")
200 ann_v2 = convert_announcement_v1_to_v2(ann_t)
201 return self.publish(ann_v2, None, lp)
203 def remote_publish_v2(self, ann_t, canary):
204 lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ")
205 return self.publish(ann_t, canary, lp)
207 def publish(self, ann_t, canary, lp):
209 self._publish(ann_t, canary, lp)
211 log.err(format="Introducer.remote_publish failed on %(ann)s",
213 level=log.UNUSUAL, parent=lp, umid="620rWA")
216 def _publish(self, ann_t, canary, lp):
217 self._debug_counts["inbound_message"] += 1
218 self.log("introducer: announcement published: %s" % (ann_t,),
220 ann, key = unsign_from_foolscap(ann_t) # might raise BadSignatureError
221 index = make_index(ann, key)
223 service_name = str(ann["service-name"])
224 if service_name == "stub_client": # for_v1
225 self._attach_stub_client(ann, lp)
228 old = self._announcements.get(index)
230 (old_ann_t, canary, old_ann, timestamp) = old
232 self.log("but we already knew it, ignoring", level=log.NOISY,
234 self._debug_counts["inbound_duplicate"] += 1
237 if "seqnum" in old_ann:
238 # must beat previous sequence number to replace
239 if ("seqnum" not in ann
240 or not isinstance(ann["seqnum"], (int,long))):
241 self.log("not replacing old ann, no valid seqnum",
242 level=log.NOISY, umid="ySbaVw")
243 self._debug_counts["inbound_no_seqnum"] += 1
245 if ann["seqnum"] <= old_ann["seqnum"]:
246 self.log("not replacing old ann, new seqnum is too old"
247 " (%s <= %s) (replay attack?)"
248 % (ann["seqnum"], old_ann["seqnum"]),
249 level=log.UNUSUAL, umid="sX7yqQ")
250 self._debug_counts["inbound_old_replay"] += 1
252 # ok, seqnum is newer, allow replacement
253 self.log("old announcement being updated", level=log.NOISY,
255 self._debug_counts["inbound_update"] += 1
256 self._announcements[index] = (ann_t, canary, ann, time.time())
258 # canary.notifyOnDisconnect ...
259 # use a CanaryWatcher? with cw.is_connected()?
260 # actually we just want foolscap to give rref.is_connected(), since
261 # this is only for the status display
263 for s in self._subscribers.get(service_name, []):
264 self._debug_counts["outbound_message"] += 1
265 self._debug_counts["outbound_announcements"] += 1
266 self._debug_outstanding += 1
267 d = s.callRemote("announce_v2", set([ann_t]))
268 d.addBoth(self._debug_retired)
269 d.addErrback(log.err,
270 format="subscriber errored on announcement %(ann)s",
271 ann=ann_t, facility="tahoe.introducer",
272 level=log.UNUSUAL, umid="jfGMXQ")
274 def _attach_stub_client(self, ann, lp):
275 # There might be a v1 subscriber for whom this is a stub_client.
276 # We might have received the subscription before the stub_client
277 # announcement, in which case we now need to fix up the record in
278 # self._subscriptions .
280 # record it for later, in case the stub_client arrived before the
282 subscriber_info = self._get_subscriber_info_from_ann(ann)
283 ann_tubid = get_tubid_string_from_ann(ann)
284 self._stub_client_announcements[ann_tubid] = subscriber_info
286 lp2 = self.log("stub_client announcement, "
287 "looking for matching subscriber",
288 parent=lp, level=log.NOISY, umid="BTywDg")
290 for sn in self._subscribers:
291 s = self._subscribers[sn]
292 for (subscriber, info) in s.items():
293 # we correlate these by looking for a subscriber whose tubid
294 # matches this announcement
295 sub_tubid = subscriber.getRemoteTubID()
296 if sub_tubid == ann_tubid:
297 self.log(format="found a match, nodeid=%(nodeid)s",
299 level=log.NOISY, parent=lp2, umid="xsWs1A")
300 # found a match. Does it need info?
302 self.log(format="replacing info",
303 level=log.NOISY, parent=lp2, umid="m5kxwA")
305 s[subscriber] = (subscriber_info, info[1])
306 # and we don't remember or announce stub_clients beyond what we
307 # need to get the subscriber_info set up
309 def _get_subscriber_info_from_ann(self, ann): # for_v1
310 sinfo = { "version": ann["version"],
311 "nickname": ann["nickname"],
312 "app-versions": ann["app-versions"],
313 "my-version": ann["my-version"],
314 "oldest-supported": ann["oldest-supported"],
318 def remote_subscribe(self, subscriber, service_name): # for_v1
319 self.log("introducer: old (v1) subscription[%s] request at %s"
320 % (service_name, subscriber), umid="hJlGUg")
321 return self.add_subscriber(WrapV1SubscriberInV2Interface(subscriber),
324 def remote_subscribe_v2(self, subscriber, service_name, subscriber_info):
325 self.log("introducer: subscription[%s] request at %s"
326 % (service_name, subscriber), umid="U3uzLg")
327 return self.add_subscriber(subscriber, service_name, subscriber_info)
329 def add_subscriber(self, subscriber, service_name, subscriber_info):
330 self._debug_counts["inbound_subscribe"] += 1
331 if service_name not in self._subscribers:
332 self._subscribers[service_name] = {}
333 subscribers = self._subscribers[service_name]
334 if subscriber in subscribers:
335 self.log("but they're already subscribed, ignoring",
336 level=log.UNUSUAL, umid="Sy9EfA")
339 if not subscriber_info: # for_v1
340 # v1 clients don't provide subscriber_info, but they should
341 # publish a 'stub client' record which contains the same
342 # information. If we've already received this, it will be in
343 # self._stub_client_announcements
344 tubid = subscriber.getRemoteTubID()
345 if tubid in self._stub_client_announcements:
346 subscriber_info = self._stub_client_announcements[tubid]
348 subscribers[subscriber] = (subscriber_info, time.time())
350 self.log("introducer: unsubscribing[%s] %s" % (service_name,
353 subscribers.pop(subscriber, None)
354 subscriber.notifyOnDisconnect(_remove)
356 # now tell them about any announcements they're interested in
357 announcements = set( [ ann_t
358 for idx,(ann_t,canary,ann,when)
359 in self._announcements.items()
360 if idx[0] == service_name] )
362 self._debug_counts["outbound_message"] += 1
363 self._debug_counts["outbound_announcements"] += len(announcements)
364 self._debug_outstanding += 1
365 d = subscriber.callRemote("announce_v2", announcements)
366 d.addBoth(self._debug_retired)
367 d.addErrback(log.err,
368 format="subscriber errored during subscribe %(anns)s",
369 anns=announcements, facility="tahoe.introducer",
370 level=log.UNUSUAL, umid="mtZepQ")