]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/introducer/server.py
92c2497e43ebaadcf6fada52cce4a4e48459a118
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / introducer / server.py
1
2 import time, os.path, textwrap
3 from zope.interface import implements
4 from twisted.application import service
5 from foolscap.api import Referenceable
6 import allmydata
7 from allmydata import node
8 from allmydata.util import log, rrefutil
9 from allmydata.util.fileutil import abspath_expanduser_unicode
10 from allmydata.introducer.interfaces import \
11      RIIntroducerPublisherAndSubscriberService_v2
12 from allmydata.introducer.common import convert_announcement_v1_to_v2, \
13      convert_announcement_v2_to_v1, unsign_from_foolscap, make_index, \
14      get_tubid_string_from_ann, SubscriberDescriptor, AnnouncementDescriptor
15
16 class FurlFileConflictError(Exception):
17     pass
18
19 class IntroducerNode(node.Node):
20     PORTNUMFILE = "introducer.port"
21     NODETYPE = "introducer"
22     GENERATED_FILES = ['introducer.furl']
23
24     def __init__(self, basedir=u"."):
25         node.Node.__init__(self, basedir)
26         self.read_config()
27         self.init_introducer()
28         webport = self.get_config("node", "web.port", None)
29         if webport:
30             self.init_web(webport) # strports string
31
32     def init_introducer(self):
33         introducerservice = IntroducerService(self.basedir)
34         self.add_service(introducerservice)
35
36         old_public_fn = os.path.join(self.basedir, u"introducer.furl")
37         private_fn = os.path.join(self.basedir, u"private", u"introducer.furl")
38
39         if os.path.exists(old_public_fn):
40             if os.path.exists(private_fn):
41                 msg = """This directory (%s) contains both an old public
42                 'introducer.furl' file, and a new-style
43                 'private/introducer.furl', so I cannot safely remove the old
44                 one. Please make sure your desired FURL is in
45                 private/introducer.furl, and remove the public file. If this
46                 causes your Introducer's FURL to change, you need to inform
47                 all grid members so they can update their tahoe.cfg.
48                 """
49                 raise FurlFileConflictError(textwrap.dedent(msg))
50             os.rename(old_public_fn, private_fn)
51         d = self.when_tub_ready()
52         def _publish(res):
53             furl = self.tub.registerReference(introducerservice,
54                                               furlFile=private_fn)
55             self.log(" introducer is at %s" % furl, umid="qF2L9A")
56             self.introducer_url = furl # for tests
57         d.addCallback(_publish)
58         d.addErrback(log.err, facility="tahoe.init",
59                      level=log.BAD, umid="UaNs9A")
60
61     def init_web(self, webport):
62         self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA")
63
64         from allmydata.webish import IntroducerWebishServer
65         nodeurl_path = os.path.join(self.basedir, u"node.url")
66         config_staticdir = self.get_config("node", "web.static", "public_html").decode('utf-8')
67         staticdir = abspath_expanduser_unicode(config_staticdir, base=self.basedir)
68         ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir)
69         self.add_service(ws)
70
71 class WrapV1SubscriberInV2Interface: # for_v1
72     """I wrap a RemoteReference that points at an old v1 subscriber, enabling
73     it to be treated like a v2 subscriber.
74     """
75
76     def __init__(self, original):
77         self.original = original # also used for tests
78     def __eq__(self, them):
79         return self.original == them
80     def __ne__(self, them):
81         return self.original != them
82     def __hash__(self):
83         return hash(self.original)
84     def getRemoteTubID(self):
85         return self.original.getRemoteTubID()
86     def getSturdyRef(self):
87         return self.original.getSturdyRef()
88     def getPeer(self):
89         return self.original.getPeer()
90     def getLocationHints(self):
91         return self.original.getLocationHints()
92     def callRemote(self, methname, *args, **kwargs):
93         m = getattr(self, "wrap_" + methname)
94         return m(*args, **kwargs)
95     def wrap_announce_v2(self, announcements):
96         anns_v1 = [convert_announcement_v2_to_v1(ann) for ann in announcements]
97         return self.original.callRemote("announce", set(anns_v1))
98     def wrap_set_encoding_parameters(self, parameters):
99         # note: unused
100         return self.original.callRemote("set_encoding_parameters", parameters)
101     def notifyOnDisconnect(self, *args, **kwargs):
102         return self.original.notifyOnDisconnect(*args, **kwargs)
103
104 class IntroducerService(service.MultiService, Referenceable):
105     implements(RIIntroducerPublisherAndSubscriberService_v2)
106     name = "introducer"
107     # v1 is the original protocol, supported since 1.0 (but only advertised
108     # starting in 1.3). v2 is the new signed protocol, supported after 1.9
109     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": { },
110                 "http://allmydata.org/tahoe/protocols/introducer/v2": { },
111                 "application-version": str(allmydata.__full_version__),
112                 }
113
114     def __init__(self, basedir="."):
115         service.MultiService.__init__(self)
116         self.introducer_url = None
117         # 'index' is (service_name, key_s, tubid), where key_s or tubid is
118         # None
119         self._announcements = {} # dict of index ->
120                                  # (ann_t, canary, ann, timestamp)
121
122         # ann (the announcement dictionary) is cleaned up: nickname is always
123         # unicode, servicename is always ascii, etc, even though
124         # simplejson.loads sometimes returns either
125
126         # self._subscribers is a dict mapping servicename to subscriptions
127         # 'subscriptions' is a dict mapping rref to a subscription
128         # 'subscription' is a tuple of (subscriber_info, timestamp)
129         # 'subscriber_info' is a dict, provided directly for v2 clients, or
130         # synthesized for v1 clients. The expected keys are:
131         #  version, nickname, app-versions, my-version, oldest-supported
132         self._subscribers = {}
133
134         # self._stub_client_announcements contains the information provided
135         # by v1 clients. We stash this so we can match it up with their
136         # subscriptions.
137         self._stub_client_announcements = {} # maps tubid to sinfo # for_v1
138
139         self._debug_counts = {"inbound_message": 0,
140                               "inbound_duplicate": 0,
141                               "inbound_no_seqnum": 0,
142                               "inbound_old_replay": 0,
143                               "inbound_update": 0,
144                               "outbound_message": 0,
145                               "outbound_announcements": 0,
146                               "inbound_subscribe": 0}
147         self._debug_outstanding = 0 # also covers WrapV1SubscriberInV2Interface
148
149     def _debug_retired(self, res):
150         self._debug_outstanding -= 1
151         return res
152
153     def log(self, *args, **kwargs):
154         if "facility" not in kwargs:
155             kwargs["facility"] = "tahoe.introducer.server"
156         return log.msg(*args, **kwargs)
157
158     def get_announcements(self, include_stub_clients=True):
159         """Return a list of AnnouncementDescriptor for all announcements"""
160         announcements = []
161         for (index, (_, canary, ann, when)) in self._announcements.items():
162             if ann["service-name"] == "stub_client":
163                 if not include_stub_clients:
164                     continue
165             ad = AnnouncementDescriptor(when, index, canary, ann)
166             announcements.append(ad)
167         return announcements
168
169     def get_subscribers(self):
170         """Return a list of SubscriberDescriptor objects for all subscribers"""
171         s = []
172         for service_name, subscriptions in self._subscribers.items():
173             for rref,(subscriber_info,when) in subscriptions.items():
174                 # note that if the subscriber didn't do Tub.setLocation,
175                 # tubid will be None. Also, subscribers do not tell us which
176                 # pubkey they use; only publishers do that.
177                 tubid = rref.getRemoteTubID() or "?"
178                 remote_address = rrefutil.stringify_remote_address(rref)
179                 # these three assume subscriber_info["version"]==0, but
180                 # should tolerate other versions
181                 if not subscriber_info:
182                      # V1 clients that haven't yet sent their stub_info data
183                     subscriber_info = {}
184                 nickname = subscriber_info.get("nickname", u"?")
185                 version = subscriber_info.get("my-version", u"?")
186                 app_versions = subscriber_info.get("app-versions", {})
187                 # 'when' is the time they subscribed
188                 sd = SubscriberDescriptor(service_name, when,
189                                           nickname, version, app_versions,
190                                           remote_address, tubid)
191                 s.append(sd)
192         return s
193
194     def remote_get_version(self):
195         return self.VERSION
196
197     def remote_publish(self, ann_t): # for_v1
198         lp = self.log("introducer: old (v1) announcement published: %s"
199                       % (ann_t,), umid="6zGOIw")
200         ann_v2 = convert_announcement_v1_to_v2(ann_t)
201         return self.publish(ann_v2, None, lp)
202
203     def remote_publish_v2(self, ann_t, canary):
204         lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ")
205         return self.publish(ann_t, canary, lp)
206
207     def publish(self, ann_t, canary, lp):
208         try:
209             self._publish(ann_t, canary, lp)
210         except:
211             log.err(format="Introducer.remote_publish failed on %(ann)s",
212                     ann=ann_t,
213                     level=log.UNUSUAL, parent=lp, umid="620rWA")
214             raise
215
216     def _publish(self, ann_t, canary, lp):
217         self._debug_counts["inbound_message"] += 1
218         self.log("introducer: announcement published: %s" % (ann_t,),
219                  umid="wKHgCw")
220         ann, key = unsign_from_foolscap(ann_t) # might raise BadSignatureError
221         index = make_index(ann, key)
222
223         service_name = str(ann["service-name"])
224         if service_name == "stub_client": # for_v1
225             self._attach_stub_client(ann, lp)
226             return
227
228         old = self._announcements.get(index)
229         if old:
230             (old_ann_t, canary, old_ann, timestamp) = old
231             if old_ann == ann:
232                 self.log("but we already knew it, ignoring", level=log.NOISY,
233                          umid="myxzLw")
234                 self._debug_counts["inbound_duplicate"] += 1
235                 return
236             else:
237                 if "seqnum" in old_ann:
238                     # must beat previous sequence number to replace
239                     if ("seqnum" not in ann
240                         or not isinstance(ann["seqnum"], (int,long))):
241                         self.log("not replacing old ann, no valid seqnum",
242                                  level=log.NOISY, umid="ySbaVw")
243                         self._debug_counts["inbound_no_seqnum"] += 1
244                         return
245                     if ann["seqnum"] <= old_ann["seqnum"]:
246                         self.log("not replacing old ann, new seqnum is too old"
247                                  " (%s <= %s) (replay attack?)"
248                                  % (ann["seqnum"], old_ann["seqnum"]),
249                                  level=log.UNUSUAL, umid="sX7yqQ")
250                         self._debug_counts["inbound_old_replay"] += 1
251                         return
252                     # ok, seqnum is newer, allow replacement
253                 self.log("old announcement being updated", level=log.NOISY,
254                          umid="304r9g")
255                 self._debug_counts["inbound_update"] += 1
256         self._announcements[index] = (ann_t, canary, ann, time.time())
257         #if canary:
258         #    canary.notifyOnDisconnect ...
259         # use a CanaryWatcher? with cw.is_connected()?
260         # actually we just want foolscap to give rref.is_connected(), since
261         # this is only for the status display
262
263         for s in self._subscribers.get(service_name, []):
264             self._debug_counts["outbound_message"] += 1
265             self._debug_counts["outbound_announcements"] += 1
266             self._debug_outstanding += 1
267             d = s.callRemote("announce_v2", set([ann_t]))
268             d.addBoth(self._debug_retired)
269             d.addErrback(log.err,
270                          format="subscriber errored on announcement %(ann)s",
271                          ann=ann_t, facility="tahoe.introducer",
272                          level=log.UNUSUAL, umid="jfGMXQ")
273
274     def _attach_stub_client(self, ann, lp):
275         # There might be a v1 subscriber for whom this is a stub_client.
276         # We might have received the subscription before the stub_client
277         # announcement, in which case we now need to fix up the record in
278         # self._subscriptions .
279
280         # record it for later, in case the stub_client arrived before the
281         # subscription
282         subscriber_info = self._get_subscriber_info_from_ann(ann)
283         ann_tubid = get_tubid_string_from_ann(ann)
284         self._stub_client_announcements[ann_tubid] = subscriber_info
285
286         lp2 = self.log("stub_client announcement, "
287                        "looking for matching subscriber",
288                        parent=lp, level=log.NOISY, umid="BTywDg")
289
290         for sn in self._subscribers:
291             s = self._subscribers[sn]
292             for (subscriber, info) in s.items():
293                 # we correlate these by looking for a subscriber whose tubid
294                 # matches this announcement
295                 sub_tubid = subscriber.getRemoteTubID()
296                 if sub_tubid == ann_tubid:
297                     self.log(format="found a match, nodeid=%(nodeid)s",
298                              nodeid=sub_tubid,
299                              level=log.NOISY, parent=lp2, umid="xsWs1A")
300                     # found a match. Does it need info?
301                     if not info[0]:
302                         self.log(format="replacing info",
303                                  level=log.NOISY, parent=lp2, umid="m5kxwA")
304                         # yup
305                         s[subscriber] = (subscriber_info, info[1])
306             # and we don't remember or announce stub_clients beyond what we
307             # need to get the subscriber_info set up
308
309     def _get_subscriber_info_from_ann(self, ann): # for_v1
310         sinfo = { "version": ann["version"],
311                   "nickname": ann["nickname"],
312                   "app-versions": ann["app-versions"],
313                   "my-version": ann["my-version"],
314                   "oldest-supported": ann["oldest-supported"],
315                   }
316         return sinfo
317
318     def remote_subscribe(self, subscriber, service_name): # for_v1
319         self.log("introducer: old (v1) subscription[%s] request at %s"
320                  % (service_name, subscriber), umid="hJlGUg")
321         return self.add_subscriber(WrapV1SubscriberInV2Interface(subscriber),
322                                    service_name, None)
323
324     def remote_subscribe_v2(self, subscriber, service_name, subscriber_info):
325         self.log("introducer: subscription[%s] request at %s"
326                  % (service_name, subscriber), umid="U3uzLg")
327         return self.add_subscriber(subscriber, service_name, subscriber_info)
328
329     def add_subscriber(self, subscriber, service_name, subscriber_info):
330         self._debug_counts["inbound_subscribe"] += 1
331         if service_name not in self._subscribers:
332             self._subscribers[service_name] = {}
333         subscribers = self._subscribers[service_name]
334         if subscriber in subscribers:
335             self.log("but they're already subscribed, ignoring",
336                      level=log.UNUSUAL, umid="Sy9EfA")
337             return
338
339         if not subscriber_info: # for_v1
340             # v1 clients don't provide subscriber_info, but they should
341             # publish a 'stub client' record which contains the same
342             # information. If we've already received this, it will be in
343             # self._stub_client_announcements
344             tubid = subscriber.getRemoteTubID()
345             if tubid in self._stub_client_announcements:
346                 subscriber_info = self._stub_client_announcements[tubid]
347
348         subscribers[subscriber] = (subscriber_info, time.time())
349         def _remove():
350             self.log("introducer: unsubscribing[%s] %s" % (service_name,
351                                                            subscriber),
352                      umid="vYGcJg")
353             subscribers.pop(subscriber, None)
354         subscriber.notifyOnDisconnect(_remove)
355
356         # now tell them about any announcements they're interested in
357         announcements = set( [ ann_t
358                                for idx,(ann_t,canary,ann,when)
359                                in self._announcements.items()
360                                if idx[0] == service_name] )
361         if announcements:
362             self._debug_counts["outbound_message"] += 1
363             self._debug_counts["outbound_announcements"] += len(announcements)
364             self._debug_outstanding += 1
365             d = subscriber.callRemote("announce_v2", announcements)
366             d.addBoth(self._debug_retired)
367             d.addErrback(log.err,
368                          format="subscriber errored during subscribe %(anns)s",
369                          anns=announcements, facility="tahoe.introducer",
370                          level=log.UNUSUAL, umid="mtZepQ")
371             return d