2 import time, os.path, textwrap
3 from zope.interface import implements
4 from twisted.application import service
5 from foolscap.api import Referenceable
7 from allmydata import node
8 from allmydata.util import log, rrefutil
9 from allmydata.util.fileutil import abspath_expanduser_unicode
10 from allmydata.introducer.interfaces import \
11 RIIntroducerPublisherAndSubscriberService_v2
12 from allmydata.introducer.common import convert_announcement_v1_to_v2, \
13 convert_announcement_v2_to_v1, unsign_from_foolscap, make_index, \
14 get_tubid_string_from_ann, SubscriberDescriptor, AnnouncementDescriptor
16 class FurlFileConflictError(Exception):
19 class IntroducerNode(node.Node):
20 PORTNUMFILE = "introducer.port"
21 NODETYPE = "introducer"
22 GENERATED_FILES = ['introducer.furl']
24 def __init__(self, basedir=u"."):
25 node.Node.__init__(self, basedir)
27 self.init_introducer()
28 webport = self.get_config("node", "web.port", None)
30 self.init_web(webport) # strports string
32 def init_introducer(self):
33 introducerservice = IntroducerService(self.basedir)
34 self.add_service(introducerservice)
36 old_public_fn = os.path.join(self.basedir, u"introducer.furl")
37 private_fn = os.path.join(self.basedir, u"private", u"introducer.furl")
39 if os.path.exists(old_public_fn):
40 if os.path.exists(private_fn):
41 msg = """This directory (%s) contains both an old public
42 'introducer.furl' file, and a new-style
43 'private/introducer.furl', so I cannot safely remove the old
44 one. Please make sure your desired FURL is in
45 private/introducer.furl, and remove the public file. If this
46 causes your Introducer's FURL to change, you need to inform
47 all grid members so they can update their tahoe.cfg.
49 raise FurlFileConflictError(textwrap.dedent(msg))
50 os.rename(old_public_fn, private_fn)
51 d = self.when_tub_ready()
53 furl = self.tub.registerReference(introducerservice,
55 self.log(" introducer is at %s" % furl, umid="qF2L9A")
56 self.introducer_url = furl # for tests
57 d.addCallback(_publish)
58 d.addErrback(log.err, facility="tahoe.init",
59 level=log.BAD, umid="UaNs9A")
61 def init_web(self, webport):
62 self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA")
64 from allmydata.webish import IntroducerWebishServer
65 nodeurl_path = os.path.join(self.basedir, u"node.url")
66 config_staticdir = self.get_config("node", "web.static", "public_html").decode('utf-8')
67 staticdir = abspath_expanduser_unicode(config_staticdir, base=self.basedir)
68 ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir)
71 class WrapV1SubscriberInV2Interface: # for_v1
72 """I wrap a RemoteReference that points at an old v1 subscriber, enabling
73 it to be treated like a v2 subscriber.
76 def __init__(self, original):
77 self.original = original # also used for tests
78 def __eq__(self, them):
79 return self.original == them
80 def __ne__(self, them):
81 return self.original != them
83 return hash(self.original)
84 def getRemoteTubID(self):
85 return self.original.getRemoteTubID()
86 def getSturdyRef(self):
87 return self.original.getSturdyRef()
89 return self.original.getPeer()
90 def getLocationHints(self):
91 return self.original.getLocationHints()
92 def callRemote(self, methname, *args, **kwargs):
93 m = getattr(self, "wrap_" + methname)
94 return m(*args, **kwargs)
95 def wrap_announce_v2(self, announcements):
96 anns_v1 = [convert_announcement_v2_to_v1(ann) for ann in announcements]
97 return self.original.callRemote("announce", set(anns_v1))
98 def wrap_set_encoding_parameters(self, parameters):
100 return self.original.callRemote("set_encoding_parameters", parameters)
101 def notifyOnDisconnect(self, *args, **kwargs):
102 return self.original.notifyOnDisconnect(*args, **kwargs)
104 class IntroducerService(service.MultiService, Referenceable):
105 implements(RIIntroducerPublisherAndSubscriberService_v2)
107 # v1 is the original protocol, supported since 1.0 (but only advertised
108 # starting in 1.3). v2 is the new signed protocol, supported after 1.9
109 VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": { },
110 "http://allmydata.org/tahoe/protocols/introducer/v2": { },
111 "application-version": str(allmydata.__full_version__),
114 def __init__(self, basedir="."):
115 service.MultiService.__init__(self)
116 self.introducer_url = None
117 # 'index' is (service_name, key_s, tubid), where key_s or tubid is
119 self._announcements = {} # dict of index ->
120 # (ann_t, canary, ann, timestamp)
122 # ann (the announcement dictionary) is cleaned up: nickname is always
123 # unicode, servicename is always ascii, etc, even though
124 # simplejson.loads sometimes returns either
126 # self._subscribers is a dict mapping servicename to subscriptions
127 # 'subscriptions' is a dict mapping rref to a subscription
128 # 'subscription' is a tuple of (subscriber_info, timestamp)
129 # 'subscriber_info' is a dict, provided directly for v2 clients, or
130 # synthesized for v1 clients. The expected keys are:
131 # version, nickname, app-versions, my-version, oldest-supported
132 self._subscribers = {}
134 # self._stub_client_announcements contains the information provided
135 # by v1 clients. We stash this so we can match it up with their
137 self._stub_client_announcements = {} # maps tubid to sinfo # for_v1
139 self._debug_counts = {"inbound_message": 0,
140 "inbound_duplicate": 0,
141 "inbound_no_seqnum": 0,
142 "inbound_old_replay": 0,
144 "outbound_message": 0,
145 "outbound_announcements": 0,
146 "inbound_subscribe": 0}
147 self._debug_outstanding = 0 # also covers WrapV1SubscriberInV2Interface
149 def _debug_retired(self, res):
150 self._debug_outstanding -= 1
153 def log(self, *args, **kwargs):
154 if "facility" not in kwargs:
155 kwargs["facility"] = "tahoe.introducer.server"
156 return log.msg(*args, **kwargs)
158 def get_announcements(self, include_stub_clients=True):
159 """Return a list of AnnouncementDescriptor for all announcements"""
161 for (index, (_, canary, ann, when)) in self._announcements.items():
162 if ann["service-name"] == "stub_client":
163 if not include_stub_clients:
165 ad = AnnouncementDescriptor(when, index, canary, ann)
166 announcements.append(ad)
169 def get_subscribers(self):
170 """Return a list of SubscriberDescriptor objects for all subscribers"""
172 for service_name, subscriptions in self._subscribers.items():
173 for rref,(subscriber_info,when) in subscriptions.items():
174 # note that if the subscriber didn't do Tub.setLocation,
175 # tubid will be None. Also, subscribers do not tell us which
176 # pubkey they use; only publishers do that.
177 tubid = rref.getRemoteTubID() or "?"
178 advertised_addresses = rrefutil.hosts_for_rref(rref)
179 remote_address = rrefutil.stringify_remote_address(rref)
180 # these three assume subscriber_info["version"]==0, but
181 # should tolerate other versions
182 if not subscriber_info:
183 # V1 clients that haven't yet sent their stub_info data
185 nickname = subscriber_info.get("nickname", u"?")
186 version = subscriber_info.get("my-version", u"?")
187 app_versions = subscriber_info.get("app-versions", {})
188 # 'when' is the time they subscribed
189 sd = SubscriberDescriptor(service_name, when,
190 nickname, version, app_versions,
191 advertised_addresses, remote_address,
196 def remote_get_version(self):
199 def remote_publish(self, ann_t): # for_v1
200 lp = self.log("introducer: old (v1) announcement published: %s"
201 % (ann_t,), umid="6zGOIw")
202 ann_v2 = convert_announcement_v1_to_v2(ann_t)
203 return self.publish(ann_v2, None, lp)
205 def remote_publish_v2(self, ann_t, canary):
206 lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ")
207 return self.publish(ann_t, canary, lp)
209 def publish(self, ann_t, canary, lp):
211 self._publish(ann_t, canary, lp)
213 log.err(format="Introducer.remote_publish failed on %(ann)s",
215 level=log.UNUSUAL, parent=lp, umid="620rWA")
218 def _publish(self, ann_t, canary, lp):
219 self._debug_counts["inbound_message"] += 1
220 self.log("introducer: announcement published: %s" % (ann_t,),
222 ann, key = unsign_from_foolscap(ann_t) # might raise BadSignatureError
223 index = make_index(ann, key)
225 service_name = str(ann["service-name"])
226 if service_name == "stub_client": # for_v1
227 self._attach_stub_client(ann, lp)
230 old = self._announcements.get(index)
232 (old_ann_t, canary, old_ann, timestamp) = old
234 self.log("but we already knew it, ignoring", level=log.NOISY,
236 self._debug_counts["inbound_duplicate"] += 1
239 if "seqnum" in old_ann:
240 # must beat previous sequence number to replace
241 if ("seqnum" not in ann
242 or not isinstance(ann["seqnum"], (int,long))):
243 self.log("not replacing old ann, no valid seqnum",
244 level=log.NOISY, umid="ySbaVw")
245 self._debug_counts["inbound_no_seqnum"] += 1
247 if ann["seqnum"] <= old_ann["seqnum"]:
248 self.log("not replacing old ann, new seqnum is too old"
249 " (%s <= %s) (replay attack?)"
250 % (ann["seqnum"], old_ann["seqnum"]),
251 level=log.UNUSUAL, umid="sX7yqQ")
252 self._debug_counts["inbound_old_replay"] += 1
254 # ok, seqnum is newer, allow replacement
255 self.log("old announcement being updated", level=log.NOISY,
257 self._debug_counts["inbound_update"] += 1
258 self._announcements[index] = (ann_t, canary, ann, time.time())
260 # canary.notifyOnDisconnect ...
261 # use a CanaryWatcher? with cw.is_connected()?
262 # actually we just want foolscap to give rref.is_connected(), since
263 # this is only for the status display
265 for s in self._subscribers.get(service_name, []):
266 self._debug_counts["outbound_message"] += 1
267 self._debug_counts["outbound_announcements"] += 1
268 self._debug_outstanding += 1
269 d = s.callRemote("announce_v2", set([ann_t]))
270 d.addBoth(self._debug_retired)
271 d.addErrback(log.err,
272 format="subscriber errored on announcement %(ann)s",
273 ann=ann_t, facility="tahoe.introducer",
274 level=log.UNUSUAL, umid="jfGMXQ")
276 def _attach_stub_client(self, ann, lp):
277 # There might be a v1 subscriber for whom this is a stub_client.
278 # We might have received the subscription before the stub_client
279 # announcement, in which case we now need to fix up the record in
280 # self._subscriptions .
282 # record it for later, in case the stub_client arrived before the
284 subscriber_info = self._get_subscriber_info_from_ann(ann)
285 ann_tubid = get_tubid_string_from_ann(ann)
286 self._stub_client_announcements[ann_tubid] = subscriber_info
288 lp2 = self.log("stub_client announcement, "
289 "looking for matching subscriber",
290 parent=lp, level=log.NOISY, umid="BTywDg")
292 for sn in self._subscribers:
293 s = self._subscribers[sn]
294 for (subscriber, info) in s.items():
295 # we correlate these by looking for a subscriber whose tubid
296 # matches this announcement
297 sub_tubid = subscriber.getRemoteTubID()
298 if sub_tubid == ann_tubid:
299 self.log(format="found a match, nodeid=%(nodeid)s",
301 level=log.NOISY, parent=lp2, umid="xsWs1A")
302 # found a match. Does it need info?
304 self.log(format="replacing info",
305 level=log.NOISY, parent=lp2, umid="m5kxwA")
307 s[subscriber] = (subscriber_info, info[1])
308 # and we don't remember or announce stub_clients beyond what we
309 # need to get the subscriber_info set up
311 def _get_subscriber_info_from_ann(self, ann): # for_v1
312 sinfo = { "version": ann["version"],
313 "nickname": ann["nickname"],
314 "app-versions": ann["app-versions"],
315 "my-version": ann["my-version"],
316 "oldest-supported": ann["oldest-supported"],
320 def remote_subscribe(self, subscriber, service_name): # for_v1
321 self.log("introducer: old (v1) subscription[%s] request at %s"
322 % (service_name, subscriber), umid="hJlGUg")
323 return self.add_subscriber(WrapV1SubscriberInV2Interface(subscriber),
326 def remote_subscribe_v2(self, subscriber, service_name, subscriber_info):
327 self.log("introducer: subscription[%s] request at %s"
328 % (service_name, subscriber), umid="U3uzLg")
329 return self.add_subscriber(subscriber, service_name, subscriber_info)
331 def add_subscriber(self, subscriber, service_name, subscriber_info):
332 self._debug_counts["inbound_subscribe"] += 1
333 if service_name not in self._subscribers:
334 self._subscribers[service_name] = {}
335 subscribers = self._subscribers[service_name]
336 if subscriber in subscribers:
337 self.log("but they're already subscribed, ignoring",
338 level=log.UNUSUAL, umid="Sy9EfA")
341 if not subscriber_info: # for_v1
342 # v1 clients don't provide subscriber_info, but they should
343 # publish a 'stub client' record which contains the same
344 # information. If we've already received this, it will be in
345 # self._stub_client_announcements
346 tubid = subscriber.getRemoteTubID()
347 if tubid in self._stub_client_announcements:
348 subscriber_info = self._stub_client_announcements[tubid]
350 subscribers[subscriber] = (subscriber_info, time.time())
352 self.log("introducer: unsubscribing[%s] %s" % (service_name,
355 subscribers.pop(subscriber, None)
356 subscriber.notifyOnDisconnect(_remove)
358 # now tell them about any announcements they're interested in
359 announcements = set( [ ann_t
360 for idx,(ann_t,canary,ann,when)
361 in self._announcements.items()
362 if idx[0] == service_name] )
364 self._debug_counts["outbound_message"] += 1
365 self._debug_counts["outbound_announcements"] += len(announcements)
366 self._debug_outstanding += 1
367 d = subscriber.callRemote("announce_v2", announcements)
368 d.addBoth(self._debug_retired)
369 d.addErrback(log.err,
370 format="subscriber errored during subscribe %(anns)s",
371 anns=announcements, facility="tahoe.introducer",
372 level=log.UNUSUAL, umid="mtZepQ")