2 import time, os.path, textwrap
3 from zope.interface import implements
4 from twisted.application import service
5 from foolscap.api import Referenceable
7 from allmydata import node
8 from allmydata.util import log, rrefutil
9 from allmydata.util.fileutil import abspath_expanduser_unicode
10 from allmydata.introducer.interfaces import \
11 RIIntroducerPublisherAndSubscriberService_v2
12 from allmydata.introducer.common import convert_announcement_v1_to_v2, \
13 convert_announcement_v2_to_v1, unsign_from_foolscap, make_index, \
14 get_tubid_string_from_ann, SubscriberDescriptor, AnnouncementDescriptor
16 class FurlFileConflictError(Exception):
19 class IntroducerNode(node.Node):
20 PORTNUMFILE = "introducer.port"
21 NODETYPE = "introducer"
22 GENERATED_FILES = ['introducer.furl']
24 def __init__(self, basedir=u"."):
25 node.Node.__init__(self, basedir)
27 self.init_introducer()
28 webport = self.get_config("node", "web.port", None)
30 self.init_web(webport) # strports string
32 def init_introducer(self):
33 introducerservice = IntroducerService(self.basedir)
34 self.add_service(introducerservice)
36 old_public_fn = os.path.join(self.basedir, u"introducer.furl")
37 private_fn = os.path.join(self.basedir, u"private", u"introducer.furl")
39 if os.path.exists(old_public_fn):
40 if os.path.exists(private_fn):
41 msg = """This directory (%s) contains both an old public
42 'introducer.furl' file, and a new-style
43 'private/introducer.furl', so I cannot safely remove the old
44 one. Please make sure your desired FURL is in
45 private/introducer.furl, and remove the public file. If this
46 causes your Introducer's FURL to change, you need to inform
47 all grid members so they can update their tahoe.cfg.
49 raise FurlFileConflictError(textwrap.dedent(msg))
50 os.rename(old_public_fn, private_fn)
51 d = self.when_tub_ready()
53 furl = self.tub.registerReference(introducerservice,
55 self.log(" introducer is at %s" % furl, umid="qF2L9A")
56 self.introducer_url = furl # for tests
57 d.addCallback(_publish)
58 d.addErrback(log.err, facility="tahoe.init",
59 level=log.BAD, umid="UaNs9A")
61 def init_web(self, webport):
62 self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA")
64 from allmydata.webish import IntroducerWebishServer
65 nodeurl_path = os.path.join(self.basedir, u"node.url")
66 config_staticdir = self.get_config("node", "web.static", "public_html").decode('utf-8')
67 staticdir = abspath_expanduser_unicode(config_staticdir, base=self.basedir)
68 ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir)
71 class WrapV1SubscriberInV2Interface: # for_v1
72 """I wrap a RemoteReference that points at an old v1 subscriber, enabling
73 it to be treated like a v2 subscriber.
76 def __init__(self, original):
77 self.original = original # also used for tests
78 def __eq__(self, them):
79 return self.original == them
80 def __ne__(self, them):
81 return self.original != them
83 return hash(self.original)
84 def getRemoteTubID(self):
85 return self.original.getRemoteTubID()
86 def getSturdyRef(self):
87 return self.original.getSturdyRef()
89 return self.original.getPeer()
90 def getLocationHints(self):
91 return self.original.getLocationHints()
92 def callRemote(self, methname, *args, **kwargs):
93 m = getattr(self, "wrap_" + methname)
94 return m(*args, **kwargs)
95 def wrap_announce_v2(self, announcements):
96 anns_v1 = [convert_announcement_v2_to_v1(ann) for ann in announcements]
97 return self.original.callRemote("announce", set(anns_v1))
98 def notifyOnDisconnect(self, *args, **kwargs):
99 return self.original.notifyOnDisconnect(*args, **kwargs)
101 class IntroducerService(service.MultiService, Referenceable):
102 implements(RIIntroducerPublisherAndSubscriberService_v2)
104 # v1 is the original protocol, supported since 1.0 (but only advertised
105 # starting in 1.3). v2 is the new signed protocol, supported after 1.9
106 VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": { },
107 "http://allmydata.org/tahoe/protocols/introducer/v2": { },
108 "application-version": str(allmydata.__full_version__),
111 def __init__(self, basedir="."):
112 service.MultiService.__init__(self)
113 self.introducer_url = None
114 # 'index' is (service_name, key_s, tubid), where key_s or tubid is
116 self._announcements = {} # dict of index ->
117 # (ann_t, canary, ann, timestamp)
119 # ann (the announcement dictionary) is cleaned up: nickname is always
120 # unicode, servicename is always ascii, etc, even though
121 # simplejson.loads sometimes returns either
123 # self._subscribers is a dict mapping servicename to subscriptions
124 # 'subscriptions' is a dict mapping rref to a subscription
125 # 'subscription' is a tuple of (subscriber_info, timestamp)
126 # 'subscriber_info' is a dict, provided directly for v2 clients, or
127 # synthesized for v1 clients. The expected keys are:
128 # version, nickname, app-versions, my-version, oldest-supported
129 self._subscribers = {}
131 # self._stub_client_announcements contains the information provided
132 # by v1 clients. We stash this so we can match it up with their
134 self._stub_client_announcements = {} # maps tubid to sinfo # for_v1
136 self._debug_counts = {"inbound_message": 0,
137 "inbound_duplicate": 0,
138 "inbound_no_seqnum": 0,
139 "inbound_old_replay": 0,
141 "outbound_message": 0,
142 "outbound_announcements": 0,
143 "inbound_subscribe": 0}
144 self._debug_outstanding = 0 # also covers WrapV1SubscriberInV2Interface
146 def _debug_retired(self, res):
147 self._debug_outstanding -= 1
150 def log(self, *args, **kwargs):
151 if "facility" not in kwargs:
152 kwargs["facility"] = "tahoe.introducer.server"
153 return log.msg(*args, **kwargs)
155 def get_announcements(self, include_stub_clients=True):
156 """Return a list of AnnouncementDescriptor for all announcements"""
158 for (index, (_, canary, ann, when)) in self._announcements.items():
159 if ann["service-name"] == "stub_client":
160 if not include_stub_clients:
162 ad = AnnouncementDescriptor(when, index, canary, ann)
163 announcements.append(ad)
166 def get_subscribers(self):
167 """Return a list of SubscriberDescriptor objects for all subscribers"""
169 for service_name, subscriptions in self._subscribers.items():
170 for rref,(subscriber_info,when) in subscriptions.items():
171 # note that if the subscriber didn't do Tub.setLocation,
172 # tubid will be None. Also, subscribers do not tell us which
173 # pubkey they use; only publishers do that.
174 tubid = rref.getRemoteTubID() or "?"
175 remote_address = rrefutil.stringify_remote_address(rref)
176 # these three assume subscriber_info["version"]==0, but
177 # should tolerate other versions
178 if not subscriber_info:
179 # V1 clients that haven't yet sent their stub_info data
181 nickname = subscriber_info.get("nickname", u"?")
182 version = subscriber_info.get("my-version", u"?")
183 app_versions = subscriber_info.get("app-versions", {})
184 # 'when' is the time they subscribed
185 sd = SubscriberDescriptor(service_name, when,
186 nickname, version, app_versions,
187 remote_address, tubid)
191 def remote_get_version(self):
194 def remote_publish(self, ann_t): # for_v1
195 lp = self.log("introducer: old (v1) announcement published: %s"
196 % (ann_t,), umid="6zGOIw")
197 ann_v2 = convert_announcement_v1_to_v2(ann_t)
198 return self.publish(ann_v2, None, lp)
200 def remote_publish_v2(self, ann_t, canary):
201 lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ")
202 return self.publish(ann_t, canary, lp)
204 def publish(self, ann_t, canary, lp):
206 self._publish(ann_t, canary, lp)
208 log.err(format="Introducer.remote_publish failed on %(ann)s",
210 level=log.UNUSUAL, parent=lp, umid="620rWA")
213 def _publish(self, ann_t, canary, lp):
214 self._debug_counts["inbound_message"] += 1
215 self.log("introducer: announcement published: %s" % (ann_t,),
217 ann, key = unsign_from_foolscap(ann_t) # might raise BadSignatureError
218 index = make_index(ann, key)
220 service_name = str(ann["service-name"])
221 if service_name == "stub_client": # for_v1
222 self._attach_stub_client(ann, lp)
225 old = self._announcements.get(index)
227 (old_ann_t, canary, old_ann, timestamp) = old
229 self.log("but we already knew it, ignoring", level=log.NOISY,
231 self._debug_counts["inbound_duplicate"] += 1
234 if "seqnum" in old_ann:
235 # must beat previous sequence number to replace
236 if ("seqnum" not in ann
237 or not isinstance(ann["seqnum"], (int,long))):
238 self.log("not replacing old ann, no valid seqnum",
239 level=log.NOISY, umid="ySbaVw")
240 self._debug_counts["inbound_no_seqnum"] += 1
242 if ann["seqnum"] <= old_ann["seqnum"]:
243 self.log("not replacing old ann, new seqnum is too old"
244 " (%s <= %s) (replay attack?)"
245 % (ann["seqnum"], old_ann["seqnum"]),
246 level=log.UNUSUAL, umid="sX7yqQ")
247 self._debug_counts["inbound_old_replay"] += 1
249 # ok, seqnum is newer, allow replacement
250 self.log("old announcement being updated", level=log.NOISY,
252 self._debug_counts["inbound_update"] += 1
253 self._announcements[index] = (ann_t, canary, ann, time.time())
255 # canary.notifyOnDisconnect ...
256 # use a CanaryWatcher? with cw.is_connected()?
257 # actually we just want foolscap to give rref.is_connected(), since
258 # this is only for the status display
260 for s in self._subscribers.get(service_name, []):
261 self._debug_counts["outbound_message"] += 1
262 self._debug_counts["outbound_announcements"] += 1
263 self._debug_outstanding += 1
264 d = s.callRemote("announce_v2", set([ann_t]))
265 d.addBoth(self._debug_retired)
266 d.addErrback(log.err,
267 format="subscriber errored on announcement %(ann)s",
268 ann=ann_t, facility="tahoe.introducer",
269 level=log.UNUSUAL, umid="jfGMXQ")
271 def _attach_stub_client(self, ann, lp):
272 # There might be a v1 subscriber for whom this is a stub_client.
273 # We might have received the subscription before the stub_client
274 # announcement, in which case we now need to fix up the record in
275 # self._subscriptions .
277 # record it for later, in case the stub_client arrived before the
279 subscriber_info = self._get_subscriber_info_from_ann(ann)
280 ann_tubid = get_tubid_string_from_ann(ann)
281 self._stub_client_announcements[ann_tubid] = subscriber_info
283 lp2 = self.log("stub_client announcement, "
284 "looking for matching subscriber",
285 parent=lp, level=log.NOISY, umid="BTywDg")
287 for sn in self._subscribers:
288 s = self._subscribers[sn]
289 for (subscriber, info) in s.items():
290 # we correlate these by looking for a subscriber whose tubid
291 # matches this announcement
292 sub_tubid = subscriber.getRemoteTubID()
293 if sub_tubid == ann_tubid:
294 self.log(format="found a match, nodeid=%(nodeid)s",
296 level=log.NOISY, parent=lp2, umid="xsWs1A")
297 # found a match. Does it need info?
299 self.log(format="replacing info",
300 level=log.NOISY, parent=lp2, umid="m5kxwA")
302 s[subscriber] = (subscriber_info, info[1])
303 # and we don't remember or announce stub_clients beyond what we
304 # need to get the subscriber_info set up
306 def _get_subscriber_info_from_ann(self, ann): # for_v1
307 sinfo = { "version": ann["version"],
308 "nickname": ann["nickname"],
309 "app-versions": ann["app-versions"],
310 "my-version": ann["my-version"],
311 "oldest-supported": ann["oldest-supported"],
315 def remote_subscribe(self, subscriber, service_name): # for_v1
316 self.log("introducer: old (v1) subscription[%s] request at %s"
317 % (service_name, subscriber), umid="hJlGUg")
318 return self.add_subscriber(WrapV1SubscriberInV2Interface(subscriber),
321 def remote_subscribe_v2(self, subscriber, service_name, subscriber_info):
322 self.log("introducer: subscription[%s] request at %s"
323 % (service_name, subscriber), umid="U3uzLg")
324 return self.add_subscriber(subscriber, service_name, subscriber_info)
326 def add_subscriber(self, subscriber, service_name, subscriber_info):
327 self._debug_counts["inbound_subscribe"] += 1
328 if service_name not in self._subscribers:
329 self._subscribers[service_name] = {}
330 subscribers = self._subscribers[service_name]
331 if subscriber in subscribers:
332 self.log("but they're already subscribed, ignoring",
333 level=log.UNUSUAL, umid="Sy9EfA")
336 if not subscriber_info: # for_v1
337 # v1 clients don't provide subscriber_info, but they should
338 # publish a 'stub client' record which contains the same
339 # information. If we've already received this, it will be in
340 # self._stub_client_announcements
341 tubid = subscriber.getRemoteTubID()
342 if tubid in self._stub_client_announcements:
343 subscriber_info = self._stub_client_announcements[tubid]
345 subscribers[subscriber] = (subscriber_info, time.time())
347 self.log("introducer: unsubscribing[%s] %s" % (service_name,
350 subscribers.pop(subscriber, None)
351 subscriber.notifyOnDisconnect(_remove)
353 # now tell them about any announcements they're interested in
354 announcements = set( [ ann_t
355 for idx,(ann_t,canary,ann,when)
356 in self._announcements.items()
357 if idx[0] == service_name] )
359 self._debug_counts["outbound_message"] += 1
360 self._debug_counts["outbound_announcements"] += len(announcements)
361 self._debug_outstanding += 1
362 d = subscriber.callRemote("announce_v2", announcements)
363 d.addBoth(self._debug_retired)
364 d.addErrback(log.err,
365 format="subscriber errored during subscribe %(anns)s",
366 anns=announcements, facility="tahoe.introducer",
367 level=log.UNUSUAL, umid="mtZepQ")