2 import re, time, sha, os.path
3 from base64 import b32decode
4 from zope.interface import implements
5 from twisted.application import service
6 from foolscap import Referenceable
7 from allmydata import node
8 from allmydata.interfaces import RIIntroducerPublisherAndSubscriberService, \
9 RIIntroducerSubscriberClient, IIntroducerClient
10 from allmydata.util import log, idlib
12 class IntroducerNode(node.Node):
13 PORTNUMFILE = "introducer.port"
14 NODETYPE = "introducer"
16 def __init__(self, basedir="."):
17 node.Node.__init__(self, basedir)
18 self.init_introducer()
19 webport = self.get_config("webport")
21 self.init_web(webport) # strports string
23 def init_introducer(self):
24 introducerservice = IntroducerService(self.basedir)
25 self.add_service(introducerservice)
27 d = self.when_tub_ready()
29 self.introducer_url = self.tub.registerReference(introducerservice,
31 self.log(" introducer is at %s" % self.introducer_url)
32 self.write_config("introducer.furl", self.introducer_url + "\n")
33 d.addCallback(_publish)
34 d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
36 def init_web(self, webport):
37 self.log("init_web(webport=%s)", args=(webport,))
39 from allmydata.webish import IntroducerWebishServer
40 nodeurl_path = os.path.join(self.basedir, "node.url")
41 ws = IntroducerWebishServer(webport, nodeurl_path)
44 class IntroducerService(service.MultiService, Referenceable):
45 implements(RIIntroducerPublisherAndSubscriberService)
48 def __init__(self, basedir="."):
49 service.MultiService.__init__(self)
50 self.introducer_url = None
51 self._announcements = set()
52 self._subscribers = {}
54 def log(self, *args, **kwargs):
55 if "facility" not in kwargs:
56 kwargs["facility"] = "tahoe.introducer"
57 return log.msg(*args, **kwargs)
59 def get_announcements(self):
60 return frozenset(self._announcements)
61 def get_subscribers(self):
62 return self._subscribers
64 def remote_publish(self, announcement):
65 self.log("introducer: announcement published: %s" % (announcement,) )
66 (furl, service_name, ri_name, nickname, ver, oldest) = announcement
67 if announcement in self._announcements:
68 self.log("but we already knew it, ignoring", level=log.NOISY)
70 self._announcements.add(announcement)
71 for s in self._subscribers.get(service_name, []):
72 s.callRemote("announce", set([announcement]))
74 def remote_subscribe(self, subscriber, service_name):
75 self.log("introducer: subscription[%s] request at %s" % (service_name,
77 if service_name not in self._subscribers:
78 self._subscribers[service_name] = set()
79 subscribers = self._subscribers[service_name]
80 if subscriber in subscribers:
81 self.log("but they're already subscribed, ignoring",
84 subscribers.add(subscriber)
86 self.log("introducer: unsubscribing[%s] %s" % (service_name,
88 subscribers.remove(subscriber)
89 subscriber.notifyOnDisconnect(_remove)
91 announcements = set( [ a
92 for a in self._announcements
93 if a[1] == service_name ] )
94 d = subscriber.callRemote("announce", announcements)
95 d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)
99 class RemoteServiceConnector:
100 """I hold information about a peer service that we want to connect to. If
101 we are connected, I hold the RemoteReference, the peer's address, and the
102 peer's version information. I remember information about when we were
103 last connected to the peer too, even if we aren't currently connected.
105 @ivar announcement_time: when we first heard about this service
106 @ivar last_connect_time: when we last established a connection
107 @ivar last_loss_time: when we last lost a connection
109 @ivar version: the peer's version, from the most recent announcement
110 @ivar oldest_supported: the peer's oldest supported version, same
111 @ivar nickname: the peer's self-reported nickname, same
113 @ivar rref: the RemoteReference, if connected, otherwise None
114 @ivar remote_host: the IAddress, if connected, otherwise None
117 def __init__(self, announcement, tub, ic):
119 self._announcement = announcement
121 (furl, service_name, ri_name, nickname, ver, oldest) = announcement
124 m = re.match(r'pb://(\w+)@', furl)
126 self._nodeid = b32decode(m.group(1).upper())
127 self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
129 self._index = (self._nodeid, service_name)
130 self.service_name = service_name
132 self.log("attempting to connect to %s" % self._nodeid_s)
133 self.announcement_time = time.time()
134 self.last_loss_time = None
136 self.remote_host = None
137 self.last_connect_time = None
139 self.oldest_supported = oldest
140 self.nickname = nickname
142 def log(self, *args, **kwargs):
143 return self._ic.log(*args, **kwargs)
148 def startConnecting(self):
149 self._reconnector = self._tub.connectTo(self._furl, self._got_service)
151 def stopConnecting(self):
152 self._reconnector.stopConnecting()
154 def _got_service(self, rref):
155 self.last_connect_time = time.time()
156 self.remote_host = rref.tracker.broker.transport.getPeer()
159 self.log("connected to %s" % self._nodeid_s)
161 self._ic.add_connection(self._nodeid, self.service_name, rref)
163 rref.notifyOnDisconnect(self._lost, rref)
165 def _lost(self, rref):
166 self.log("lost connection to %s" % self._nodeid_s)
167 self.last_loss_time = time.time()
169 self.remote_host = None
170 self._ic.remove_connection(self._nodeid, self.service_name, rref)
174 class IntroducerClient(service.Service, Referenceable):
175 implements(RIIntroducerSubscriberClient, IIntroducerClient)
177 def __init__(self, tub, introducer_furl,
178 nickname, my_version, oldest_supported):
180 self.introducer_furl = introducer_furl
182 self._nickname = nickname
183 self._my_version = my_version
184 self._oldest_supported = oldest_supported
186 self._published_announcements = set()
188 self._publisher = None
189 self._connected = False
191 self._subscribed_service_names = set()
192 self._subscriptions = set() # requests we've actually sent
193 self._received_announcements = set()
194 # TODO: this set will grow without bound, until the node is restarted
196 # we only accept one announcement per (peerid+service_name) pair.
197 # This insures that an upgraded host replace their previous
198 # announcement. It also means that each peer must have their own Tub
199 # (no sharing), which is slightly weird but consistent with the rest
200 # of the Tahoe codebase.
201 self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
202 # self._connections is a set of (peerid, service_name, rref) tuples
203 self._connections = set()
205 self.counter = 0 # incremented each time we change state, for tests
206 self.encoding_parameters = None
208 def startService(self):
209 service.Service.startService(self)
210 rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
211 self._introducer_reconnector = rc
212 def connect_failed(failure):
213 self.log("Initial Introducer connection failed: perhaps it's down",
214 level=log.WEIRD, failure=failure)
215 d = self._tub.getReference(self.introducer_furl)
216 d.addErrback(connect_failed)
218 def _got_introducer(self, publisher):
219 self.log("connected to introducer")
220 self._connected = True
221 self._publisher = publisher
222 publisher.notifyOnDisconnect(self._disconnected)
223 self._maybe_publish()
224 self._maybe_subscribe()
226 def _disconnected(self):
227 self.log("bummer, we've lost our connection to the introducer")
228 self._connected = False
229 self._publisher = None
230 self._subscriptions.clear()
232 def stopService(self):
233 service.Service.stopService(self)
234 self._introducer_reconnector.stopConnecting()
235 for rsc in self._connectors.itervalues():
238 def log(self, *args, **kwargs):
239 if "facility" not in kwargs:
240 kwargs["facility"] = "tahoe.introducer"
241 return log.msg(*args, **kwargs)
244 def publish(self, furl, service_name, remoteinterface_name):
245 ann = (furl, service_name, remoteinterface_name,
246 self._nickname, self._my_version, self._oldest_supported)
247 self._published_announcements.add(ann)
248 self._maybe_publish()
250 def subscribe_to(self, service_name):
251 self._subscribed_service_names.add(service_name)
252 self._maybe_subscribe()
254 def _maybe_subscribe(self):
255 if not self._publisher:
256 self.log("want to subscribe, but no introducer yet",
259 for service_name in self._subscribed_service_names:
260 if service_name not in self._subscriptions:
261 # there is a race here, but the subscription desk ignores
262 # duplicate requests.
263 self._subscriptions.add(service_name)
264 d = self._publisher.callRemote("subscribe", self, service_name)
265 d.addErrback(log.err, facility="tahoe.introducer",
268 def _maybe_publish(self):
269 if not self._publisher:
270 self.log("want to publish, but no introducer yet", level=log.NOISY)
272 # this re-publishes everything. The Introducer ignores duplicates
273 for ann in self._published_announcements:
274 d = self._publisher.callRemote("publish", ann)
275 d.addErrback(log.err, facility="tahoe.introducer",
280 def remote_announce(self, announcements):
281 for ann in announcements:
282 self.log("received %d announcements" % len(announcements))
283 (furl, service_name, ri_name, nickname, ver, oldest) = ann
284 if service_name not in self._subscribed_service_names:
285 self.log("announcement for a service we don't care about [%s]"
286 % (service_name,), level=log.WEIRD)
288 if ann in self._received_announcements:
289 self.log("ignoring old announcement: %s" % (ann,),
292 self.log("new announcement[%s]: %s" % (service_name, ann))
293 self._received_announcements.add(ann)
294 self._new_announcement(ann)
296 def _new_announcement(self, announcement):
297 # this will only be called for new announcements
298 rsc = RemoteServiceConnector(announcement, self._tub, self)
299 index = rsc.get_index()
300 if index in self._connectors:
301 self._connectors[index].stopConnecting()
302 self._connectors[index] = rsc
303 rsc.startConnecting()
305 def add_connection(self, nodeid, service_name, rref):
306 self._connections.add( (nodeid, service_name, rref) )
309 def remove_connection(self, nodeid, service_name, rref):
310 self._connections.discard( (nodeid, service_name, rref) )
314 def get_all_connections(self):
315 return frozenset(self._connections)
317 def get_all_connectors(self):
318 return self._connectors.copy()
320 def get_all_peerids(self):
321 return frozenset([peerid
322 for (peerid, service_name, rref)
323 in self._connections])
325 def get_all_connections_for(self, service_name):
327 for c in self._connections
328 if c[1] == service_name])
330 def get_permuted_peers(self, service_name, key):
331 """Return an ordered list of (peerid, rref) tuples."""
334 for (c_peerid, c_service_name, rref) in self._connections:
335 assert isinstance(c_peerid, str)
336 if c_service_name != service_name:
338 permuted = sha.new(key + c_peerid).digest()
339 results.append((permuted, c_peerid, rref))
341 results.sort(lambda a,b: cmp(a[0], b[0]))
342 return [ (r[1], r[2]) for r in results ]
346 def remote_set_encoding_parameters(self, parameters):
347 self.encoding_parameters = parameters
349 def connected_to_introducer(self):
350 return self._connected
352 def debug_disconnect_from_peerid(self, victim_nodeid):
353 # for unit tests: locate and sever all connections to the given
355 for (nodeid, service_name, rref) in self._connections:
356 if nodeid == victim_nodeid:
357 rref.tracker.broker.transport.loseConnection()