]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/introducer/server.py
394fdf00e0add6d6a1b660495a49cdcd487d5188
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / introducer / server.py
1
2 import time, os.path
3 from zope.interface import implements
4 from twisted.application import service
5 from foolscap.api import Referenceable
6 import allmydata
7 from allmydata import node
8 from allmydata.util import log, rrefutil
9 from allmydata.introducer.interfaces import \
10      RIIntroducerPublisherAndSubscriberService_v2
11 from allmydata.introducer.common import convert_announcement_v1_to_v2, \
12      convert_announcement_v2_to_v1, unsign_from_foolscap, make_index, \
13      get_tubid_string_from_ann, SubscriberDescriptor, AnnouncementDescriptor
14
15 class IntroducerNode(node.Node):
16     PORTNUMFILE = "introducer.port"
17     NODETYPE = "introducer"
18     GENERATED_FILES = ['introducer.furl']
19
20     def __init__(self, basedir="."):
21         node.Node.__init__(self, basedir)
22         self.read_config()
23         self.init_introducer()
24         webport = self.get_config("node", "web.port", None)
25         if webport:
26             self.init_web(webport) # strports string
27
28     def init_introducer(self):
29         introducerservice = IntroducerService(self.basedir)
30         self.add_service(introducerservice)
31
32         d = self.when_tub_ready()
33         def _publish(res):
34             self.introducer_url = self.tub.registerReference(introducerservice,
35                                                              "introducer")
36             self.log(" introducer is at %s" % self.introducer_url,
37                      umid="qF2L9A")
38             self.write_config("introducer.furl", self.introducer_url + "\n")
39         d.addCallback(_publish)
40         d.addErrback(log.err, facility="tahoe.init",
41                      level=log.BAD, umid="UaNs9A")
42
43     def init_web(self, webport):
44         self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA")
45
46         from allmydata.webish import IntroducerWebishServer
47         nodeurl_path = os.path.join(self.basedir, "node.url")
48         staticdir = self.get_config("node", "web.static", "public_html")
49         staticdir = os.path.expanduser(staticdir)
50         ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir)
51         self.add_service(ws)
52
53 class WrapV1SubscriberInV2Interface: # for_v1
54     """I wrap a RemoteReference that points at an old v1 subscriber, enabling
55     it to be treated like a v2 subscriber.
56     """
57
58     def __init__(self, original):
59         self.original = original # also used for tests
60     def __eq__(self, them):
61         return self.original == them
62     def __ne__(self, them):
63         return self.original != them
64     def __hash__(self):
65         return hash(self.original)
66     def getRemoteTubID(self):
67         return self.original.getRemoteTubID()
68     def getSturdyRef(self):
69         return self.original.getSturdyRef()
70     def getPeer(self):
71         return self.original.getPeer()
72     def getLocationHints(self):
73         return self.original.getLocationHints()
74     def callRemote(self, methname, *args, **kwargs):
75         m = getattr(self, "wrap_" + methname)
76         return m(*args, **kwargs)
77     def wrap_announce_v2(self, announcements):
78         anns_v1 = [convert_announcement_v2_to_v1(ann) for ann in announcements]
79         return self.original.callRemote("announce", set(anns_v1))
80     def wrap_set_encoding_parameters(self, parameters):
81         # note: unused
82         return self.original.callRemote("set_encoding_parameters", parameters)
83     def notifyOnDisconnect(self, *args, **kwargs):
84         return self.original.notifyOnDisconnect(*args, **kwargs)
85
86 class IntroducerService(service.MultiService, Referenceable):
87     implements(RIIntroducerPublisherAndSubscriberService_v2)
88     name = "introducer"
89     # v1 is the original protocol, supported since 1.0 (but only advertised
90     # starting in 1.3). v2 is the new signed protocol, supported after 1.9
91     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": { },
92                 "http://allmydata.org/tahoe/protocols/introducer/v2": { },
93                 "application-version": str(allmydata.__full_version__),
94                 }
95
96     def __init__(self, basedir="."):
97         service.MultiService.__init__(self)
98         self.introducer_url = None
99         # 'index' is (service_name, key_s, tubid), where key_s or tubid is
100         # None
101         self._announcements = {} # dict of index ->
102                                  # (ann_t, canary, ann, timestamp)
103
104         # ann (the announcement dictionary) is cleaned up: nickname is always
105         # unicode, servicename is always ascii, etc, even though
106         # simplejson.loads sometimes returns either
107
108         # self._subscribers is a dict mapping servicename to subscriptions
109         # 'subscriptions' is a dict mapping rref to a subscription
110         # 'subscription' is a tuple of (subscriber_info, timestamp)
111         # 'subscriber_info' is a dict, provided directly for v2 clients, or
112         # synthesized for v1 clients. The expected keys are:
113         #  version, nickname, app-versions, my-version, oldest-supported
114         self._subscribers = {}
115
116         # self._stub_client_announcements contains the information provided
117         # by v1 clients. We stash this so we can match it up with their
118         # subscriptions.
119         self._stub_client_announcements = {} # maps tubid to sinfo # for_v1
120
121         self._debug_counts = {"inbound_message": 0,
122                               "inbound_duplicate": 0,
123                               "inbound_update": 0,
124                               "outbound_message": 0,
125                               "outbound_announcements": 0,
126                               "inbound_subscribe": 0}
127         self._debug_outstanding = 0 # also covers WrapV1SubscriberInV2Interface
128
129     def _debug_retired(self, res):
130         self._debug_outstanding -= 1
131         return res
132
133     def log(self, *args, **kwargs):
134         if "facility" not in kwargs:
135             kwargs["facility"] = "tahoe.introducer.server"
136         return log.msg(*args, **kwargs)
137
138     def get_announcements(self, include_stub_clients=True):
139         """Return a list of AnnouncementDescriptor for all announcements"""
140         announcements = []
141         for (index, (_, canary, ann, when)) in self._announcements.items():
142             if ann["service-name"] == "stub_client":
143                 if not include_stub_clients:
144                     continue
145             ad = AnnouncementDescriptor(when, index, canary, ann)
146             announcements.append(ad)
147         return announcements
148
149     def get_subscribers(self):
150         """Return a list of SubscriberDescriptor objects for all subscribers"""
151         s = []
152         for service_name, subscriptions in self._subscribers.items():
153             for rref,(subscriber_info,when) in subscriptions.items():
154                 # note that if the subscriber didn't do Tub.setLocation,
155                 # tubid will be None. Also, subscribers do not tell us which
156                 # pubkey they use; only publishers do that.
157                 tubid = rref.getRemoteTubID() or "?"
158                 advertised_addresses = rrefutil.hosts_for_rref(rref)
159                 remote_address = rrefutil.stringify_remote_address(rref)
160                 # these three assume subscriber_info["version"]==0, but
161                 # should tolerate other versions
162                 if not subscriber_info:
163                      # V1 clients that haven't yet sent their stub_info data
164                     subscriber_info = {}
165                 nickname = subscriber_info.get("nickname", u"?")
166                 version = subscriber_info.get("my-version", u"?")
167                 app_versions = subscriber_info.get("app-versions", {})
168                 # 'when' is the time they subscribed
169                 sd = SubscriberDescriptor(service_name, when,
170                                           nickname, version, app_versions,
171                                           advertised_addresses, remote_address,
172                                           tubid)
173                 s.append(sd)
174         return s
175
176     def remote_get_version(self):
177         return self.VERSION
178
179     def remote_publish(self, ann_t): # for_v1
180         lp = self.log("introducer: old (v1) announcement published: %s"
181                       % (ann_t,), umid="6zGOIw")
182         ann_v2 = convert_announcement_v1_to_v2(ann_t)
183         return self.publish(ann_v2, None, lp)
184
185     def remote_publish_v2(self, ann_t, canary):
186         lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ")
187         return self.publish(ann_t, canary, lp)
188
189     def publish(self, ann_t, canary, lp):
190         try:
191             self._publish(ann_t, canary, lp)
192         except:
193             log.err(format="Introducer.remote_publish failed on %(ann)s",
194                     ann=ann_t,
195                     level=log.UNUSUAL, parent=lp, umid="620rWA")
196             raise
197
198     def _publish(self, ann_t, canary, lp):
199         self._debug_counts["inbound_message"] += 1
200         self.log("introducer: announcement published: %s" % (ann_t,),
201                  umid="wKHgCw")
202         ann, key = unsign_from_foolscap(ann_t) # might raise BadSignatureError
203         index = make_index(ann, key)
204
205         service_name = str(ann["service-name"])
206         if service_name == "stub_client": # for_v1
207             self._attach_stub_client(ann, lp)
208             return
209
210         old = self._announcements.get(index)
211         if old:
212             (old_ann_t, canary, old_ann, timestamp) = old
213             if old_ann == ann:
214                 self.log("but we already knew it, ignoring", level=log.NOISY,
215                          umid="myxzLw")
216                 self._debug_counts["inbound_duplicate"] += 1
217                 return
218             else:
219                 self.log("old announcement being updated", level=log.NOISY,
220                          umid="304r9g")
221                 self._debug_counts["inbound_update"] += 1
222         self._announcements[index] = (ann_t, canary, ann, time.time())
223         #if canary:
224         #    canary.notifyOnDisconnect ...
225         # use a CanaryWatcher? with cw.is_connected()?
226         # actually we just want foolscap to give rref.is_connected(), since
227         # this is only for the status display
228
229         for s in self._subscribers.get(service_name, []):
230             self._debug_counts["outbound_message"] += 1
231             self._debug_counts["outbound_announcements"] += 1
232             self._debug_outstanding += 1
233             d = s.callRemote("announce_v2", set([ann_t]))
234             d.addBoth(self._debug_retired)
235             d.addErrback(log.err,
236                          format="subscriber errored on announcement %(ann)s",
237                          ann=ann_t, facility="tahoe.introducer",
238                          level=log.UNUSUAL, umid="jfGMXQ")
239
240     def _attach_stub_client(self, ann, lp):
241         # There might be a v1 subscriber for whom this is a stub_client.
242         # We might have received the subscription before the stub_client
243         # announcement, in which case we now need to fix up the record in
244         # self._subscriptions .
245
246         # record it for later, in case the stub_client arrived before the
247         # subscription
248         subscriber_info = self._get_subscriber_info_from_ann(ann)
249         ann_tubid = get_tubid_string_from_ann(ann)
250         self._stub_client_announcements[ann_tubid] = subscriber_info
251
252         lp2 = self.log("stub_client announcement, "
253                        "looking for matching subscriber",
254                        parent=lp, level=log.NOISY, umid="BTywDg")
255
256         for sn in self._subscribers:
257             s = self._subscribers[sn]
258             for (subscriber, info) in s.items():
259                 # we correlate these by looking for a subscriber whose tubid
260                 # matches this announcement
261                 sub_tubid = subscriber.getRemoteTubID()
262                 if sub_tubid == ann_tubid:
263                     self.log(format="found a match, nodeid=%(nodeid)s",
264                              nodeid=sub_tubid,
265                              level=log.NOISY, parent=lp2, umid="xsWs1A")
266                     # found a match. Does it need info?
267                     if not info[0]:
268                         self.log(format="replacing info",
269                                  level=log.NOISY, parent=lp2, umid="m5kxwA")
270                         # yup
271                         s[subscriber] = (subscriber_info, info[1])
272             # and we don't remember or announce stub_clients beyond what we
273             # need to get the subscriber_info set up
274
275     def _get_subscriber_info_from_ann(self, ann): # for_v1
276         sinfo = { "version": ann["version"],
277                   "nickname": ann["nickname"],
278                   "app-versions": ann["app-versions"],
279                   "my-version": ann["my-version"],
280                   "oldest-supported": ann["oldest-supported"],
281                   }
282         return sinfo
283
284     def remote_subscribe(self, subscriber, service_name): # for_v1
285         self.log("introducer: old (v1) subscription[%s] request at %s"
286                  % (service_name, subscriber), umid="hJlGUg")
287         return self.add_subscriber(WrapV1SubscriberInV2Interface(subscriber),
288                                    service_name, None)
289
290     def remote_subscribe_v2(self, subscriber, service_name, subscriber_info):
291         self.log("introducer: subscription[%s] request at %s"
292                  % (service_name, subscriber), umid="U3uzLg")
293         return self.add_subscriber(subscriber, service_name, subscriber_info)
294
295     def add_subscriber(self, subscriber, service_name, subscriber_info):
296         self._debug_counts["inbound_subscribe"] += 1
297         if service_name not in self._subscribers:
298             self._subscribers[service_name] = {}
299         subscribers = self._subscribers[service_name]
300         if subscriber in subscribers:
301             self.log("but they're already subscribed, ignoring",
302                      level=log.UNUSUAL, umid="Sy9EfA")
303             return
304
305         if not subscriber_info: # for_v1
306             # v1 clients don't provide subscriber_info, but they should
307             # publish a 'stub client' record which contains the same
308             # information. If we've already received this, it will be in
309             # self._stub_client_announcements
310             tubid = subscriber.getRemoteTubID()
311             if tubid in self._stub_client_announcements:
312                 subscriber_info = self._stub_client_announcements[tubid]
313
314         subscribers[subscriber] = (subscriber_info, time.time())
315         def _remove():
316             self.log("introducer: unsubscribing[%s] %s" % (service_name,
317                                                            subscriber),
318                      umid="vYGcJg")
319             subscribers.pop(subscriber, None)
320         subscriber.notifyOnDisconnect(_remove)
321
322         # now tell them about any announcements they're interested in
323         announcements = set( [ ann_t
324                                for idx,(ann_t,canary,ann,when)
325                                in self._announcements.items()
326                                if idx[0] == service_name] )
327         if announcements:
328             self._debug_counts["outbound_message"] += 1
329             self._debug_counts["outbound_announcements"] += len(announcements)
330             self._debug_outstanding += 1
331             d = subscriber.callRemote("announce_v2", announcements)
332             d.addBoth(self._debug_retired)
333             d.addErrback(log.err,
334                          format="subscriber errored during subscribe %(anns)s",
335                          anns=announcements, facility="tahoe.introducer",
336                          level=log.UNUSUAL, umid="mtZepQ")
337             return d