]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/introducer/server.py
remove introducer's set_encoding_parameters
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / introducer / server.py
1
2 import time, os.path, textwrap
3 from zope.interface import implements
4 from twisted.application import service
5 from foolscap.api import Referenceable
6 import allmydata
7 from allmydata import node
8 from allmydata.util import log, rrefutil
9 from allmydata.util.fileutil import abspath_expanduser_unicode
10 from allmydata.introducer.interfaces import \
11      RIIntroducerPublisherAndSubscriberService_v2
12 from allmydata.introducer.common import convert_announcement_v1_to_v2, \
13      convert_announcement_v2_to_v1, unsign_from_foolscap, make_index, \
14      get_tubid_string_from_ann, SubscriberDescriptor, AnnouncementDescriptor
15
16 class FurlFileConflictError(Exception):
17     pass
18
19 class IntroducerNode(node.Node):
20     PORTNUMFILE = "introducer.port"
21     NODETYPE = "introducer"
22     GENERATED_FILES = ['introducer.furl']
23
24     def __init__(self, basedir=u"."):
25         node.Node.__init__(self, basedir)
26         self.read_config()
27         self.init_introducer()
28         webport = self.get_config("node", "web.port", None)
29         if webport:
30             self.init_web(webport) # strports string
31
32     def init_introducer(self):
33         introducerservice = IntroducerService(self.basedir)
34         self.add_service(introducerservice)
35
36         old_public_fn = os.path.join(self.basedir, u"introducer.furl")
37         private_fn = os.path.join(self.basedir, u"private", u"introducer.furl")
38
39         if os.path.exists(old_public_fn):
40             if os.path.exists(private_fn):
41                 msg = """This directory (%s) contains both an old public
42                 'introducer.furl' file, and a new-style
43                 'private/introducer.furl', so I cannot safely remove the old
44                 one. Please make sure your desired FURL is in
45                 private/introducer.furl, and remove the public file. If this
46                 causes your Introducer's FURL to change, you need to inform
47                 all grid members so they can update their tahoe.cfg.
48                 """
49                 raise FurlFileConflictError(textwrap.dedent(msg))
50             os.rename(old_public_fn, private_fn)
51         d = self.when_tub_ready()
52         def _publish(res):
53             furl = self.tub.registerReference(introducerservice,
54                                               furlFile=private_fn)
55             self.log(" introducer is at %s" % furl, umid="qF2L9A")
56             self.introducer_url = furl # for tests
57         d.addCallback(_publish)
58         d.addErrback(log.err, facility="tahoe.init",
59                      level=log.BAD, umid="UaNs9A")
60
61     def init_web(self, webport):
62         self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA")
63
64         from allmydata.webish import IntroducerWebishServer
65         nodeurl_path = os.path.join(self.basedir, u"node.url")
66         config_staticdir = self.get_config("node", "web.static", "public_html").decode('utf-8')
67         staticdir = abspath_expanduser_unicode(config_staticdir, base=self.basedir)
68         ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir)
69         self.add_service(ws)
70
71 class WrapV1SubscriberInV2Interface: # for_v1
72     """I wrap a RemoteReference that points at an old v1 subscriber, enabling
73     it to be treated like a v2 subscriber.
74     """
75
76     def __init__(self, original):
77         self.original = original # also used for tests
78     def __eq__(self, them):
79         return self.original == them
80     def __ne__(self, them):
81         return self.original != them
82     def __hash__(self):
83         return hash(self.original)
84     def getRemoteTubID(self):
85         return self.original.getRemoteTubID()
86     def getSturdyRef(self):
87         return self.original.getSturdyRef()
88     def getPeer(self):
89         return self.original.getPeer()
90     def getLocationHints(self):
91         return self.original.getLocationHints()
92     def callRemote(self, methname, *args, **kwargs):
93         m = getattr(self, "wrap_" + methname)
94         return m(*args, **kwargs)
95     def wrap_announce_v2(self, announcements):
96         anns_v1 = [convert_announcement_v2_to_v1(ann) for ann in announcements]
97         return self.original.callRemote("announce", set(anns_v1))
98     def notifyOnDisconnect(self, *args, **kwargs):
99         return self.original.notifyOnDisconnect(*args, **kwargs)
100
101 class IntroducerService(service.MultiService, Referenceable):
102     implements(RIIntroducerPublisherAndSubscriberService_v2)
103     name = "introducer"
104     # v1 is the original protocol, supported since 1.0 (but only advertised
105     # starting in 1.3). v2 is the new signed protocol, supported after 1.9
106     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": { },
107                 "http://allmydata.org/tahoe/protocols/introducer/v2": { },
108                 "application-version": str(allmydata.__full_version__),
109                 }
110
111     def __init__(self, basedir="."):
112         service.MultiService.__init__(self)
113         self.introducer_url = None
114         # 'index' is (service_name, key_s, tubid), where key_s or tubid is
115         # None
116         self._announcements = {} # dict of index ->
117                                  # (ann_t, canary, ann, timestamp)
118
119         # ann (the announcement dictionary) is cleaned up: nickname is always
120         # unicode, servicename is always ascii, etc, even though
121         # simplejson.loads sometimes returns either
122
123         # self._subscribers is a dict mapping servicename to subscriptions
124         # 'subscriptions' is a dict mapping rref to a subscription
125         # 'subscription' is a tuple of (subscriber_info, timestamp)
126         # 'subscriber_info' is a dict, provided directly for v2 clients, or
127         # synthesized for v1 clients. The expected keys are:
128         #  version, nickname, app-versions, my-version, oldest-supported
129         self._subscribers = {}
130
131         # self._stub_client_announcements contains the information provided
132         # by v1 clients. We stash this so we can match it up with their
133         # subscriptions.
134         self._stub_client_announcements = {} # maps tubid to sinfo # for_v1
135
136         self._debug_counts = {"inbound_message": 0,
137                               "inbound_duplicate": 0,
138                               "inbound_no_seqnum": 0,
139                               "inbound_old_replay": 0,
140                               "inbound_update": 0,
141                               "outbound_message": 0,
142                               "outbound_announcements": 0,
143                               "inbound_subscribe": 0}
144         self._debug_outstanding = 0 # also covers WrapV1SubscriberInV2Interface
145
146     def _debug_retired(self, res):
147         self._debug_outstanding -= 1
148         return res
149
150     def log(self, *args, **kwargs):
151         if "facility" not in kwargs:
152             kwargs["facility"] = "tahoe.introducer.server"
153         return log.msg(*args, **kwargs)
154
155     def get_announcements(self, include_stub_clients=True):
156         """Return a list of AnnouncementDescriptor for all announcements"""
157         announcements = []
158         for (index, (_, canary, ann, when)) in self._announcements.items():
159             if ann["service-name"] == "stub_client":
160                 if not include_stub_clients:
161                     continue
162             ad = AnnouncementDescriptor(when, index, canary, ann)
163             announcements.append(ad)
164         return announcements
165
166     def get_subscribers(self):
167         """Return a list of SubscriberDescriptor objects for all subscribers"""
168         s = []
169         for service_name, subscriptions in self._subscribers.items():
170             for rref,(subscriber_info,when) in subscriptions.items():
171                 # note that if the subscriber didn't do Tub.setLocation,
172                 # tubid will be None. Also, subscribers do not tell us which
173                 # pubkey they use; only publishers do that.
174                 tubid = rref.getRemoteTubID() or "?"
175                 remote_address = rrefutil.stringify_remote_address(rref)
176                 # these three assume subscriber_info["version"]==0, but
177                 # should tolerate other versions
178                 if not subscriber_info:
179                      # V1 clients that haven't yet sent their stub_info data
180                     subscriber_info = {}
181                 nickname = subscriber_info.get("nickname", u"?")
182                 version = subscriber_info.get("my-version", u"?")
183                 app_versions = subscriber_info.get("app-versions", {})
184                 # 'when' is the time they subscribed
185                 sd = SubscriberDescriptor(service_name, when,
186                                           nickname, version, app_versions,
187                                           remote_address, tubid)
188                 s.append(sd)
189         return s
190
191     def remote_get_version(self):
192         return self.VERSION
193
194     def remote_publish(self, ann_t): # for_v1
195         lp = self.log("introducer: old (v1) announcement published: %s"
196                       % (ann_t,), umid="6zGOIw")
197         ann_v2 = convert_announcement_v1_to_v2(ann_t)
198         return self.publish(ann_v2, None, lp)
199
200     def remote_publish_v2(self, ann_t, canary):
201         lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ")
202         return self.publish(ann_t, canary, lp)
203
204     def publish(self, ann_t, canary, lp):
205         try:
206             self._publish(ann_t, canary, lp)
207         except:
208             log.err(format="Introducer.remote_publish failed on %(ann)s",
209                     ann=ann_t,
210                     level=log.UNUSUAL, parent=lp, umid="620rWA")
211             raise
212
213     def _publish(self, ann_t, canary, lp):
214         self._debug_counts["inbound_message"] += 1
215         self.log("introducer: announcement published: %s" % (ann_t,),
216                  umid="wKHgCw")
217         ann, key = unsign_from_foolscap(ann_t) # might raise BadSignatureError
218         index = make_index(ann, key)
219
220         service_name = str(ann["service-name"])
221         if service_name == "stub_client": # for_v1
222             self._attach_stub_client(ann, lp)
223             return
224
225         old = self._announcements.get(index)
226         if old:
227             (old_ann_t, canary, old_ann, timestamp) = old
228             if old_ann == ann:
229                 self.log("but we already knew it, ignoring", level=log.NOISY,
230                          umid="myxzLw")
231                 self._debug_counts["inbound_duplicate"] += 1
232                 return
233             else:
234                 if "seqnum" in old_ann:
235                     # must beat previous sequence number to replace
236                     if ("seqnum" not in ann
237                         or not isinstance(ann["seqnum"], (int,long))):
238                         self.log("not replacing old ann, no valid seqnum",
239                                  level=log.NOISY, umid="ySbaVw")
240                         self._debug_counts["inbound_no_seqnum"] += 1
241                         return
242                     if ann["seqnum"] <= old_ann["seqnum"]:
243                         self.log("not replacing old ann, new seqnum is too old"
244                                  " (%s <= %s) (replay attack?)"
245                                  % (ann["seqnum"], old_ann["seqnum"]),
246                                  level=log.UNUSUAL, umid="sX7yqQ")
247                         self._debug_counts["inbound_old_replay"] += 1
248                         return
249                     # ok, seqnum is newer, allow replacement
250                 self.log("old announcement being updated", level=log.NOISY,
251                          umid="304r9g")
252                 self._debug_counts["inbound_update"] += 1
253         self._announcements[index] = (ann_t, canary, ann, time.time())
254         #if canary:
255         #    canary.notifyOnDisconnect ...
256         # use a CanaryWatcher? with cw.is_connected()?
257         # actually we just want foolscap to give rref.is_connected(), since
258         # this is only for the status display
259
260         for s in self._subscribers.get(service_name, []):
261             self._debug_counts["outbound_message"] += 1
262             self._debug_counts["outbound_announcements"] += 1
263             self._debug_outstanding += 1
264             d = s.callRemote("announce_v2", set([ann_t]))
265             d.addBoth(self._debug_retired)
266             d.addErrback(log.err,
267                          format="subscriber errored on announcement %(ann)s",
268                          ann=ann_t, facility="tahoe.introducer",
269                          level=log.UNUSUAL, umid="jfGMXQ")
270
271     def _attach_stub_client(self, ann, lp):
272         # There might be a v1 subscriber for whom this is a stub_client.
273         # We might have received the subscription before the stub_client
274         # announcement, in which case we now need to fix up the record in
275         # self._subscriptions .
276
277         # record it for later, in case the stub_client arrived before the
278         # subscription
279         subscriber_info = self._get_subscriber_info_from_ann(ann)
280         ann_tubid = get_tubid_string_from_ann(ann)
281         self._stub_client_announcements[ann_tubid] = subscriber_info
282
283         lp2 = self.log("stub_client announcement, "
284                        "looking for matching subscriber",
285                        parent=lp, level=log.NOISY, umid="BTywDg")
286
287         for sn in self._subscribers:
288             s = self._subscribers[sn]
289             for (subscriber, info) in s.items():
290                 # we correlate these by looking for a subscriber whose tubid
291                 # matches this announcement
292                 sub_tubid = subscriber.getRemoteTubID()
293                 if sub_tubid == ann_tubid:
294                     self.log(format="found a match, nodeid=%(nodeid)s",
295                              nodeid=sub_tubid,
296                              level=log.NOISY, parent=lp2, umid="xsWs1A")
297                     # found a match. Does it need info?
298                     if not info[0]:
299                         self.log(format="replacing info",
300                                  level=log.NOISY, parent=lp2, umid="m5kxwA")
301                         # yup
302                         s[subscriber] = (subscriber_info, info[1])
303             # and we don't remember or announce stub_clients beyond what we
304             # need to get the subscriber_info set up
305
306     def _get_subscriber_info_from_ann(self, ann): # for_v1
307         sinfo = { "version": ann["version"],
308                   "nickname": ann["nickname"],
309                   "app-versions": ann["app-versions"],
310                   "my-version": ann["my-version"],
311                   "oldest-supported": ann["oldest-supported"],
312                   }
313         return sinfo
314
315     def remote_subscribe(self, subscriber, service_name): # for_v1
316         self.log("introducer: old (v1) subscription[%s] request at %s"
317                  % (service_name, subscriber), umid="hJlGUg")
318         return self.add_subscriber(WrapV1SubscriberInV2Interface(subscriber),
319                                    service_name, None)
320
321     def remote_subscribe_v2(self, subscriber, service_name, subscriber_info):
322         self.log("introducer: subscription[%s] request at %s"
323                  % (service_name, subscriber), umid="U3uzLg")
324         return self.add_subscriber(subscriber, service_name, subscriber_info)
325
326     def add_subscriber(self, subscriber, service_name, subscriber_info):
327         self._debug_counts["inbound_subscribe"] += 1
328         if service_name not in self._subscribers:
329             self._subscribers[service_name] = {}
330         subscribers = self._subscribers[service_name]
331         if subscriber in subscribers:
332             self.log("but they're already subscribed, ignoring",
333                      level=log.UNUSUAL, umid="Sy9EfA")
334             return
335
336         if not subscriber_info: # for_v1
337             # v1 clients don't provide subscriber_info, but they should
338             # publish a 'stub client' record which contains the same
339             # information. If we've already received this, it will be in
340             # self._stub_client_announcements
341             tubid = subscriber.getRemoteTubID()
342             if tubid in self._stub_client_announcements:
343                 subscriber_info = self._stub_client_announcements[tubid]
344
345         subscribers[subscriber] = (subscriber_info, time.time())
346         def _remove():
347             self.log("introducer: unsubscribing[%s] %s" % (service_name,
348                                                            subscriber),
349                      umid="vYGcJg")
350             subscribers.pop(subscriber, None)
351         subscriber.notifyOnDisconnect(_remove)
352
353         # now tell them about any announcements they're interested in
354         announcements = set( [ ann_t
355                                for idx,(ann_t,canary,ann,when)
356                                in self._announcements.items()
357                                if idx[0] == service_name] )
358         if announcements:
359             self._debug_counts["outbound_message"] += 1
360             self._debug_counts["outbound_announcements"] += len(announcements)
361             self._debug_outstanding += 1
362             d = subscriber.callRemote("announce_v2", announcements)
363             d.addBoth(self._debug_retired)
364             d.addErrback(log.err,
365                          format="subscriber errored during subscribe %(anns)s",
366                          anns=announcements, facility="tahoe.introducer",
367                          level=log.UNUSUAL, umid="mtZepQ")
368             return d