]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/introducer.py
add a webserver for the Introducer, showing service announcements and subscriber...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / introducer.py
1
2 import re, time, sha, os.path
3 from base64 import b32decode
4 from zope.interface import implements
5 from twisted.application import service
6 from foolscap import Referenceable
7 from allmydata import node
8 from allmydata.interfaces import RIIntroducerPublisherAndSubscriberService, \
9      RIIntroducerSubscriberClient, IIntroducerClient
10 from allmydata.util import log, idlib
11
12 class IntroducerNode(node.Node):
13     PORTNUMFILE = "introducer.port"
14     NODETYPE = "introducer"
15
16     def __init__(self, basedir="."):
17         node.Node.__init__(self, basedir)
18         self.init_introducer()
19         webport = self.get_config("webport")
20         if webport:
21             self.init_web(webport) # strports string
22
23     def init_introducer(self):
24         introducerservice = IntroducerService(self.basedir)
25         self.add_service(introducerservice)
26
27         d = self.when_tub_ready()
28         def _publish(res):
29             self.introducer_url = self.tub.registerReference(introducerservice,
30                                                              "introducer")
31             self.log(" introducer is at %s" % self.introducer_url)
32             self.write_config("introducer.furl", self.introducer_url + "\n")
33         d.addCallback(_publish)
34         d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
35
36     def init_web(self, webport):
37         self.log("init_web(webport=%s)", args=(webport,))
38
39         from allmydata.webish import IntroducerWebishServer
40         nodeurl_path = os.path.join(self.basedir, "node.url")
41         ws = IntroducerWebishServer(webport, nodeurl_path)
42         self.add_service(ws)
43
44 class IntroducerService(service.MultiService, Referenceable):
45     implements(RIIntroducerPublisherAndSubscriberService)
46     name = "introducer"
47
48     def __init__(self, basedir="."):
49         service.MultiService.__init__(self)
50         self.introducer_url = None
51         self._announcements = set()
52         self._subscribers = {}
53
54     def log(self, *args, **kwargs):
55         if "facility" not in kwargs:
56             kwargs["facility"] = "tahoe.introducer"
57         return log.msg(*args, **kwargs)
58
59     def get_announcements(self):
60         return frozenset(self._announcements)
61     def get_subscribers(self):
62         return self._subscribers
63
64     def remote_publish(self, announcement):
65         self.log("introducer: announcement published: %s" % (announcement,) )
66         (furl, service_name, ri_name, nickname, ver, oldest) = announcement
67         if announcement in self._announcements:
68             self.log("but we already knew it, ignoring", level=log.NOISY)
69             return
70         self._announcements.add(announcement)
71         for s in self._subscribers.get(service_name, []):
72             s.callRemote("announce", set([announcement]))
73
74     def remote_subscribe(self, subscriber, service_name):
75         self.log("introducer: subscription[%s] request at %s" % (service_name,
76                                                                  subscriber))
77         if service_name not in self._subscribers:
78             self._subscribers[service_name] = set()
79         subscribers = self._subscribers[service_name]
80         if subscriber in subscribers:
81             self.log("but they're already subscribed, ignoring",
82                      level=log.UNUSUAL)
83             return
84         subscribers.add(subscriber)
85         def _remove():
86             self.log("introducer: unsubscribing[%s] %s" % (service_name,
87                                                            subscriber))
88             subscribers.remove(subscriber)
89         subscriber.notifyOnDisconnect(_remove)
90
91         announcements = set( [ a
92                                for a in self._announcements
93                                if a[1] == service_name ] )
94         d = subscriber.callRemote("announce", announcements)
95         d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)
96
97
98
99 class RemoteServiceConnector:
100     """I hold information about a peer service that we want to connect to. If
101     we are connected, I hold the RemoteReference, the peer's address, and the
102     peer's version information. I remember information about when we were
103     last connected to the peer too, even if we aren't currently connected.
104
105     @ivar announcement_time: when we first heard about this service
106     @ivar last_connect_time: when we last established a connection
107     @ivar last_loss_time: when we last lost a connection
108
109     @ivar version: the peer's version, from the most recent announcement
110     @ivar oldest_supported: the peer's oldest supported version, same
111     @ivar nickname: the peer's self-reported nickname, same
112
113     @ivar rref: the RemoteReference, if connected, otherwise None
114     @ivar remote_host: the IAddress, if connected, otherwise None
115     """
116
117     def __init__(self, announcement, tub, ic):
118         self._tub = tub
119         self._announcement = announcement
120         self._ic = ic
121         (furl, service_name, ri_name, nickname, ver, oldest) = announcement
122
123         self._furl = furl
124         m = re.match(r'pb://(\w+)@', furl)
125         assert m
126         self._nodeid = b32decode(m.group(1).upper())
127         self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
128
129         self._index = (self._nodeid, service_name)
130         self.service_name = service_name
131
132         self.log("attempting to connect to %s" % self._nodeid_s)
133         self.announcement_time = time.time()
134         self.last_loss_time = None
135         self.rref = None
136         self.remote_host = None
137         self.last_connect_time = None
138         self.version = ver
139         self.oldest_supported = oldest
140         self.nickname = nickname
141
142     def log(self, *args, **kwargs):
143         return self._ic.log(*args, **kwargs)
144
145     def get_index(self):
146         return self._index
147
148     def startConnecting(self):
149         self._reconnector = self._tub.connectTo(self._furl, self._got_service)
150
151     def stopConnecting(self):
152         self._reconnector.stopConnecting()
153
154     def _got_service(self, rref):
155         self.last_connect_time = time.time()
156         self.remote_host = rref.tracker.broker.transport.getPeer()
157
158         self.rref = rref
159         self.log("connected to %s" % self._nodeid_s)
160
161         self._ic.add_connection(self._nodeid, self.service_name, rref)
162
163         rref.notifyOnDisconnect(self._lost, rref)
164
165     def _lost(self, rref):
166         self.log("lost connection to %s" % self._nodeid_s)
167         self.last_loss_time = time.time()
168         self.rref = None
169         self.remote_host = None
170         self._ic.remove_connection(self._nodeid, self.service_name, rref)
171
172
173
174 class IntroducerClient(service.Service, Referenceable):
175     implements(RIIntroducerSubscriberClient, IIntroducerClient)
176
177     def __init__(self, tub, introducer_furl,
178                  nickname, my_version, oldest_supported):
179         self._tub = tub
180         self.introducer_furl = introducer_furl
181
182         self._nickname = nickname
183         self._my_version = my_version
184         self._oldest_supported = oldest_supported
185
186         self._published_announcements = set()
187
188         self._publisher = None
189         self._connected = False
190
191         self._subscribed_service_names = set()
192         self._subscriptions = set() # requests we've actually sent
193         self._received_announcements = set()
194         # TODO: this set will grow without bound, until the node is restarted
195
196         # we only accept one announcement per (peerid+service_name) pair.
197         # This insures that an upgraded host replace their previous
198         # announcement. It also means that each peer must have their own Tub
199         # (no sharing), which is slightly weird but consistent with the rest
200         # of the Tahoe codebase.
201         self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
202         # self._connections is a set of (peerid, service_name, rref) tuples
203         self._connections = set()
204
205         self.counter = 0 # incremented each time we change state, for tests
206         self.encoding_parameters = None
207
208     def startService(self):
209         service.Service.startService(self)
210         rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
211         self._introducer_reconnector = rc
212         def connect_failed(failure):
213             self.log("Initial Introducer connection failed: perhaps it's down",
214                      level=log.WEIRD, failure=failure)
215         d = self._tub.getReference(self.introducer_furl)
216         d.addErrback(connect_failed)
217
218     def _got_introducer(self, publisher):
219         self.log("connected to introducer")
220         self._connected = True
221         self._publisher = publisher
222         publisher.notifyOnDisconnect(self._disconnected)
223         self._maybe_publish()
224         self._maybe_subscribe()
225
226     def _disconnected(self):
227         self.log("bummer, we've lost our connection to the introducer")
228         self._connected = False
229         self._publisher = None
230         self._subscriptions.clear()
231
232     def stopService(self):
233         service.Service.stopService(self)
234         self._introducer_reconnector.stopConnecting()
235         for rsc in self._connectors.itervalues():
236             rsc.stopConnecting()
237
238     def log(self, *args, **kwargs):
239         if "facility" not in kwargs:
240             kwargs["facility"] = "tahoe.introducer"
241         return log.msg(*args, **kwargs)
242
243
244     def publish(self, furl, service_name, remoteinterface_name):
245         ann = (furl, service_name, remoteinterface_name,
246                self._nickname, self._my_version, self._oldest_supported)
247         self._published_announcements.add(ann)
248         self._maybe_publish()
249
250     def subscribe_to(self, service_name):
251         self._subscribed_service_names.add(service_name)
252         self._maybe_subscribe()
253
254     def _maybe_subscribe(self):
255         if not self._publisher:
256             self.log("want to subscribe, but no introducer yet",
257                      level=log.NOISY)
258             return
259         for service_name in self._subscribed_service_names:
260             if service_name not in self._subscriptions:
261                 # there is a race here, but the subscription desk ignores
262                 # duplicate requests.
263                 self._subscriptions.add(service_name)
264                 d = self._publisher.callRemote("subscribe", self, service_name)
265                 d.addErrback(log.err, facility="tahoe.introducer",
266                              level=log.WEIRD)
267
268     def _maybe_publish(self):
269         if not self._publisher:
270             self.log("want to publish, but no introducer yet", level=log.NOISY)
271             return
272         # this re-publishes everything. The Introducer ignores duplicates
273         for ann in self._published_announcements:
274             d = self._publisher.callRemote("publish", ann)
275             d.addErrback(log.err, facility="tahoe.introducer",
276                          level=log.WEIRD)
277
278
279
280     def remote_announce(self, announcements):
281         for ann in announcements:
282             self.log("received %d announcements" % len(announcements))
283             (furl, service_name, ri_name, nickname, ver, oldest) = ann
284             if service_name not in self._subscribed_service_names:
285                 self.log("announcement for a service we don't care about [%s]"
286                          % (service_name,), level=log.WEIRD)
287                 continue
288             if ann in self._received_announcements:
289                 self.log("ignoring old announcement: %s" % (ann,),
290                          level=log.NOISY)
291                 continue
292             self.log("new announcement[%s]: %s" % (service_name, ann))
293             self._received_announcements.add(ann)
294             self._new_announcement(ann)
295
296     def _new_announcement(self, announcement):
297         # this will only be called for new announcements
298         rsc = RemoteServiceConnector(announcement, self._tub, self)
299         index = rsc.get_index()
300         if index in self._connectors:
301             self._connectors[index].stopConnecting()
302         self._connectors[index] = rsc
303         rsc.startConnecting()
304
305     def add_connection(self, nodeid, service_name, rref):
306         self._connections.add( (nodeid, service_name, rref) )
307         self.counter += 1
308
309     def remove_connection(self, nodeid, service_name, rref):
310         self._connections.discard( (nodeid, service_name, rref) )
311         self.counter += 1
312
313
314     def get_all_connections(self):
315         return frozenset(self._connections)
316
317     def get_all_connectors(self):
318         return self._connectors.copy()
319
320     def get_all_peerids(self):
321         return frozenset([peerid
322                           for (peerid, service_name, rref)
323                           in self._connections])
324
325     def get_all_connections_for(self, service_name):
326         return frozenset([c
327                           for c in self._connections
328                           if c[1] == service_name])
329
330     def get_permuted_peers(self, service_name, key):
331         """Return an ordered list of (peerid, rref) tuples."""
332
333         results = []
334         for (c_peerid, c_service_name, rref) in self._connections:
335             assert isinstance(c_peerid, str)
336             if c_service_name != service_name:
337                 continue
338             permuted = sha.new(key + c_peerid).digest()
339             results.append((permuted, c_peerid, rref))
340
341         results.sort(lambda a,b: cmp(a[0], b[0]))
342         return [ (r[1], r[2]) for r in results ]
343
344
345
346     def remote_set_encoding_parameters(self, parameters):
347         self.encoding_parameters = parameters
348
349     def connected_to_introducer(self):
350         return self._connected
351
352     def debug_disconnect_from_peerid(self, victim_nodeid):
353         # for unit tests: locate and sever all connections to the given
354         # peerid.
355         for (nodeid, service_name, rref) in self._connections:
356             if nodeid == victim_nodeid:
357                 rref.tracker.broker.transport.loseConnection()