]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/introducer/server.py
7031c3af68b792e45651476342b736f14c782757
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / introducer / server.py
1
2 import time, os.path, textwrap
3 from zope.interface import implements
4 from twisted.application import service
5 from foolscap.api import Referenceable
6 import allmydata
7 from allmydata import node
8 from allmydata.util import log, rrefutil
9 from allmydata.util.fileutil import abspath_expanduser_unicode
10 from allmydata.introducer.interfaces import \
11      RIIntroducerPublisherAndSubscriberService_v2
12 from allmydata.introducer.common import convert_announcement_v1_to_v2, \
13      convert_announcement_v2_to_v1, unsign_from_foolscap, make_index, \
14      get_tubid_string_from_ann, SubscriberDescriptor, AnnouncementDescriptor
15
16 class FurlFileConflictError(Exception):
17     pass
18
19 class IntroducerNode(node.Node):
20     PORTNUMFILE = "introducer.port"
21     NODETYPE = "introducer"
22     GENERATED_FILES = ['introducer.furl']
23
24     def __init__(self, basedir=u"."):
25         node.Node.__init__(self, basedir)
26         self.read_config()
27         self.init_introducer()
28         webport = self.get_config("node", "web.port", None)
29         if webport:
30             self.init_web(webport) # strports string
31
32     def init_introducer(self):
33         introducerservice = IntroducerService(self.basedir)
34         self.add_service(introducerservice)
35
36         old_public_fn = os.path.join(self.basedir, u"introducer.furl")
37         private_fn = os.path.join(self.basedir, u"private", u"introducer.furl")
38
39         if os.path.exists(old_public_fn):
40             if os.path.exists(private_fn):
41                 msg = """This directory (%s) contains both an old public
42                 'introducer.furl' file, and a new-style
43                 'private/introducer.furl', so I cannot safely remove the old
44                 one. Please make sure your desired FURL is in
45                 private/introducer.furl, and remove the public file. If this
46                 causes your Introducer's FURL to change, you need to inform
47                 all grid members so they can update their tahoe.cfg.
48                 """
49                 raise FurlFileConflictError(textwrap.dedent(msg))
50             os.rename(old_public_fn, private_fn)
51         d = self.when_tub_ready()
52         def _publish(res):
53             furl = self.tub.registerReference(introducerservice,
54                                               furlFile=private_fn)
55             self.log(" introducer is at %s" % furl, umid="qF2L9A")
56             self.introducer_url = furl # for tests
57         d.addCallback(_publish)
58         d.addErrback(log.err, facility="tahoe.init",
59                      level=log.BAD, umid="UaNs9A")
60
61     def init_web(self, webport):
62         self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA")
63
64         from allmydata.webish import IntroducerWebishServer
65         nodeurl_path = os.path.join(self.basedir, u"node.url")
66         config_staticdir = self.get_config("node", "web.static", "public_html").decode('utf-8')
67         staticdir = abspath_expanduser_unicode(config_staticdir, base=self.basedir)
68         ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir)
69         self.add_service(ws)
70
71 class WrapV1SubscriberInV2Interface: # for_v1
72     """I wrap a RemoteReference that points at an old v1 subscriber, enabling
73     it to be treated like a v2 subscriber.
74     """
75
76     def __init__(self, original):
77         self.original = original # also used for tests
78     def __eq__(self, them):
79         return self.original == them
80     def __ne__(self, them):
81         return self.original != them
82     def __hash__(self):
83         return hash(self.original)
84     def getRemoteTubID(self):
85         return self.original.getRemoteTubID()
86     def getSturdyRef(self):
87         return self.original.getSturdyRef()
88     def getPeer(self):
89         return self.original.getPeer()
90     def getLocationHints(self):
91         return self.original.getLocationHints()
92     def callRemote(self, methname, *args, **kwargs):
93         m = getattr(self, "wrap_" + methname)
94         return m(*args, **kwargs)
95     def wrap_announce_v2(self, announcements):
96         anns_v1 = [convert_announcement_v2_to_v1(ann) for ann in announcements]
97         return self.original.callRemote("announce", set(anns_v1))
98     def wrap_set_encoding_parameters(self, parameters):
99         # note: unused
100         return self.original.callRemote("set_encoding_parameters", parameters)
101     def notifyOnDisconnect(self, *args, **kwargs):
102         return self.original.notifyOnDisconnect(*args, **kwargs)
103
104 class IntroducerService(service.MultiService, Referenceable):
105     implements(RIIntroducerPublisherAndSubscriberService_v2)
106     name = "introducer"
107     # v1 is the original protocol, supported since 1.0 (but only advertised
108     # starting in 1.3). v2 is the new signed protocol, supported after 1.9
109     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": { },
110                 "http://allmydata.org/tahoe/protocols/introducer/v2": { },
111                 "application-version": str(allmydata.__full_version__),
112                 }
113
114     def __init__(self, basedir="."):
115         service.MultiService.__init__(self)
116         self.introducer_url = None
117         # 'index' is (service_name, key_s, tubid), where key_s or tubid is
118         # None
119         self._announcements = {} # dict of index ->
120                                  # (ann_t, canary, ann, timestamp)
121
122         # ann (the announcement dictionary) is cleaned up: nickname is always
123         # unicode, servicename is always ascii, etc, even though
124         # simplejson.loads sometimes returns either
125
126         # self._subscribers is a dict mapping servicename to subscriptions
127         # 'subscriptions' is a dict mapping rref to a subscription
128         # 'subscription' is a tuple of (subscriber_info, timestamp)
129         # 'subscriber_info' is a dict, provided directly for v2 clients, or
130         # synthesized for v1 clients. The expected keys are:
131         #  version, nickname, app-versions, my-version, oldest-supported
132         self._subscribers = {}
133
134         # self._stub_client_announcements contains the information provided
135         # by v1 clients. We stash this so we can match it up with their
136         # subscriptions.
137         self._stub_client_announcements = {} # maps tubid to sinfo # for_v1
138
139         self._debug_counts = {"inbound_message": 0,
140                               "inbound_duplicate": 0,
141                               "inbound_no_seqnum": 0,
142                               "inbound_old_replay": 0,
143                               "inbound_update": 0,
144                               "outbound_message": 0,
145                               "outbound_announcements": 0,
146                               "inbound_subscribe": 0}
147         self._debug_outstanding = 0 # also covers WrapV1SubscriberInV2Interface
148
149     def _debug_retired(self, res):
150         self._debug_outstanding -= 1
151         return res
152
153     def log(self, *args, **kwargs):
154         if "facility" not in kwargs:
155             kwargs["facility"] = "tahoe.introducer.server"
156         return log.msg(*args, **kwargs)
157
158     def get_announcements(self, include_stub_clients=True):
159         """Return a list of AnnouncementDescriptor for all announcements"""
160         announcements = []
161         for (index, (_, canary, ann, when)) in self._announcements.items():
162             if ann["service-name"] == "stub_client":
163                 if not include_stub_clients:
164                     continue
165             ad = AnnouncementDescriptor(when, index, canary, ann)
166             announcements.append(ad)
167         return announcements
168
169     def get_subscribers(self):
170         """Return a list of SubscriberDescriptor objects for all subscribers"""
171         s = []
172         for service_name, subscriptions in self._subscribers.items():
173             for rref,(subscriber_info,when) in subscriptions.items():
174                 # note that if the subscriber didn't do Tub.setLocation,
175                 # tubid will be None. Also, subscribers do not tell us which
176                 # pubkey they use; only publishers do that.
177                 tubid = rref.getRemoteTubID() or "?"
178                 advertised_addresses = rrefutil.hosts_for_rref(rref)
179                 remote_address = rrefutil.stringify_remote_address(rref)
180                 # these three assume subscriber_info["version"]==0, but
181                 # should tolerate other versions
182                 if not subscriber_info:
183                      # V1 clients that haven't yet sent their stub_info data
184                     subscriber_info = {}
185                 nickname = subscriber_info.get("nickname", u"?")
186                 version = subscriber_info.get("my-version", u"?")
187                 app_versions = subscriber_info.get("app-versions", {})
188                 # 'when' is the time they subscribed
189                 sd = SubscriberDescriptor(service_name, when,
190                                           nickname, version, app_versions,
191                                           advertised_addresses, remote_address,
192                                           tubid)
193                 s.append(sd)
194         return s
195
196     def remote_get_version(self):
197         return self.VERSION
198
199     def remote_publish(self, ann_t): # for_v1
200         lp = self.log("introducer: old (v1) announcement published: %s"
201                       % (ann_t,), umid="6zGOIw")
202         ann_v2 = convert_announcement_v1_to_v2(ann_t)
203         return self.publish(ann_v2, None, lp)
204
205     def remote_publish_v2(self, ann_t, canary):
206         lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ")
207         return self.publish(ann_t, canary, lp)
208
209     def publish(self, ann_t, canary, lp):
210         try:
211             self._publish(ann_t, canary, lp)
212         except:
213             log.err(format="Introducer.remote_publish failed on %(ann)s",
214                     ann=ann_t,
215                     level=log.UNUSUAL, parent=lp, umid="620rWA")
216             raise
217
218     def _publish(self, ann_t, canary, lp):
219         self._debug_counts["inbound_message"] += 1
220         self.log("introducer: announcement published: %s" % (ann_t,),
221                  umid="wKHgCw")
222         ann, key = unsign_from_foolscap(ann_t) # might raise BadSignatureError
223         index = make_index(ann, key)
224
225         service_name = str(ann["service-name"])
226         if service_name == "stub_client": # for_v1
227             self._attach_stub_client(ann, lp)
228             return
229
230         old = self._announcements.get(index)
231         if old:
232             (old_ann_t, canary, old_ann, timestamp) = old
233             if old_ann == ann:
234                 self.log("but we already knew it, ignoring", level=log.NOISY,
235                          umid="myxzLw")
236                 self._debug_counts["inbound_duplicate"] += 1
237                 return
238             else:
239                 if "seqnum" in old_ann:
240                     # must beat previous sequence number to replace
241                     if ("seqnum" not in ann
242                         or not isinstance(ann["seqnum"], (int,long))):
243                         self.log("not replacing old ann, no valid seqnum",
244                                  level=log.NOISY, umid="ySbaVw")
245                         self._debug_counts["inbound_no_seqnum"] += 1
246                         return
247                     if ann["seqnum"] <= old_ann["seqnum"]:
248                         self.log("not replacing old ann, new seqnum is too old"
249                                  " (%s <= %s) (replay attack?)"
250                                  % (ann["seqnum"], old_ann["seqnum"]),
251                                  level=log.UNUSUAL, umid="sX7yqQ")
252                         self._debug_counts["inbound_old_replay"] += 1
253                         return
254                     # ok, seqnum is newer, allow replacement
255                 self.log("old announcement being updated", level=log.NOISY,
256                          umid="304r9g")
257                 self._debug_counts["inbound_update"] += 1
258         self._announcements[index] = (ann_t, canary, ann, time.time())
259         #if canary:
260         #    canary.notifyOnDisconnect ...
261         # use a CanaryWatcher? with cw.is_connected()?
262         # actually we just want foolscap to give rref.is_connected(), since
263         # this is only for the status display
264
265         for s in self._subscribers.get(service_name, []):
266             self._debug_counts["outbound_message"] += 1
267             self._debug_counts["outbound_announcements"] += 1
268             self._debug_outstanding += 1
269             d = s.callRemote("announce_v2", set([ann_t]))
270             d.addBoth(self._debug_retired)
271             d.addErrback(log.err,
272                          format="subscriber errored on announcement %(ann)s",
273                          ann=ann_t, facility="tahoe.introducer",
274                          level=log.UNUSUAL, umid="jfGMXQ")
275
276     def _attach_stub_client(self, ann, lp):
277         # There might be a v1 subscriber for whom this is a stub_client.
278         # We might have received the subscription before the stub_client
279         # announcement, in which case we now need to fix up the record in
280         # self._subscriptions .
281
282         # record it for later, in case the stub_client arrived before the
283         # subscription
284         subscriber_info = self._get_subscriber_info_from_ann(ann)
285         ann_tubid = get_tubid_string_from_ann(ann)
286         self._stub_client_announcements[ann_tubid] = subscriber_info
287
288         lp2 = self.log("stub_client announcement, "
289                        "looking for matching subscriber",
290                        parent=lp, level=log.NOISY, umid="BTywDg")
291
292         for sn in self._subscribers:
293             s = self._subscribers[sn]
294             for (subscriber, info) in s.items():
295                 # we correlate these by looking for a subscriber whose tubid
296                 # matches this announcement
297                 sub_tubid = subscriber.getRemoteTubID()
298                 if sub_tubid == ann_tubid:
299                     self.log(format="found a match, nodeid=%(nodeid)s",
300                              nodeid=sub_tubid,
301                              level=log.NOISY, parent=lp2, umid="xsWs1A")
302                     # found a match. Does it need info?
303                     if not info[0]:
304                         self.log(format="replacing info",
305                                  level=log.NOISY, parent=lp2, umid="m5kxwA")
306                         # yup
307                         s[subscriber] = (subscriber_info, info[1])
308             # and we don't remember or announce stub_clients beyond what we
309             # need to get the subscriber_info set up
310
311     def _get_subscriber_info_from_ann(self, ann): # for_v1
312         sinfo = { "version": ann["version"],
313                   "nickname": ann["nickname"],
314                   "app-versions": ann["app-versions"],
315                   "my-version": ann["my-version"],
316                   "oldest-supported": ann["oldest-supported"],
317                   }
318         return sinfo
319
320     def remote_subscribe(self, subscriber, service_name): # for_v1
321         self.log("introducer: old (v1) subscription[%s] request at %s"
322                  % (service_name, subscriber), umid="hJlGUg")
323         return self.add_subscriber(WrapV1SubscriberInV2Interface(subscriber),
324                                    service_name, None)
325
326     def remote_subscribe_v2(self, subscriber, service_name, subscriber_info):
327         self.log("introducer: subscription[%s] request at %s"
328                  % (service_name, subscriber), umid="U3uzLg")
329         return self.add_subscriber(subscriber, service_name, subscriber_info)
330
331     def add_subscriber(self, subscriber, service_name, subscriber_info):
332         self._debug_counts["inbound_subscribe"] += 1
333         if service_name not in self._subscribers:
334             self._subscribers[service_name] = {}
335         subscribers = self._subscribers[service_name]
336         if subscriber in subscribers:
337             self.log("but they're already subscribed, ignoring",
338                      level=log.UNUSUAL, umid="Sy9EfA")
339             return
340
341         if not subscriber_info: # for_v1
342             # v1 clients don't provide subscriber_info, but they should
343             # publish a 'stub client' record which contains the same
344             # information. If we've already received this, it will be in
345             # self._stub_client_announcements
346             tubid = subscriber.getRemoteTubID()
347             if tubid in self._stub_client_announcements:
348                 subscriber_info = self._stub_client_announcements[tubid]
349
350         subscribers[subscriber] = (subscriber_info, time.time())
351         def _remove():
352             self.log("introducer: unsubscribing[%s] %s" % (service_name,
353                                                            subscriber),
354                      umid="vYGcJg")
355             subscribers.pop(subscriber, None)
356         subscriber.notifyOnDisconnect(_remove)
357
358         # now tell them about any announcements they're interested in
359         announcements = set( [ ann_t
360                                for idx,(ann_t,canary,ann,when)
361                                in self._announcements.items()
362                                if idx[0] == service_name] )
363         if announcements:
364             self._debug_counts["outbound_message"] += 1
365             self._debug_counts["outbound_announcements"] += len(announcements)
366             self._debug_outstanding += 1
367             d = subscriber.callRemote("announce_v2", announcements)
368             d.addBoth(self._debug_retired)
369             d.addErrback(log.err,
370                          format="subscriber errored during subscribe %(anns)s",
371                          anns=announcements, facility="tahoe.introducer",
372                          level=log.UNUSUAL, umid="mtZepQ")
373             return d