3 from base64 import b32decode
4 from zope.interface import implements, Interface
5 from twisted.application import service
7 from allmydata.interfaces import InsufficientVersionError
8 from allmydata.util import log, idlib, rrefutil
9 from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \
10 RemoteInterface, Referenceable, eventually, SturdyRef
11 from allmydata.introducer.common import SubscriberDescriptor, \
12 AnnouncementDescriptor
13 FURL = StringConstraint(1000)
15 # We keep a copy of the old introducer (both client and server) here to
16 # support compatibility tests. The old client is supposed to handle the new
17 # server, and new client is supposed to handle the old server.
20 # Announcements are (FURL, service_name, remoteinterface_name,
21 # nickname, my_version, oldest_supported)
22 # the (FURL, service_name, remoteinterface_name) refer to the service being
23 # announced. The (nickname, my_version, oldest_supported) refer to the
24 # client as a whole. The my_version/oldest_supported strings can be parsed
25 # by an allmydata.util.version.Version instance, and then compared. The
26 # first goal is to make sure that nodes are not confused by speaking to an
27 # incompatible peer. The second goal is to enable the development of
28 # backwards-compatibility code.
30 Announcement = TupleOf(FURL, str, str,
33 class RIIntroducerSubscriberClient_v1(RemoteInterface):
34 __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com"
36 def announce(announcements=SetOf(Announcement)):
37 """I accept announcements from the publisher."""
40 # When Foolscap can handle multiple interfaces (Foolscap#17), the
41 # full-powered introducer will implement both RIIntroducerPublisher and
42 # RIIntroducerSubscriberService. Until then, we define
43 # RIIntroducerPublisherAndSubscriberService as a combination of the two, and
44 # make everybody use that.
46 class RIIntroducerPublisher_v1(RemoteInterface):
47 """To publish a service to the world, connect to me and give me your
48 announcement message. I will deliver a copy to all connected subscribers."""
49 __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com"
51 def publish(announcement=Announcement):
55 class RIIntroducerSubscriberService_v1(RemoteInterface):
56 __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com"
58 def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
59 """Give me a subscriber reference, and I will call its new_peers()
60 method will any announcements that match the desired service name. I
61 will ignore duplicate subscriptions.
65 class RIIntroducerPublisherAndSubscriberService_v1(RemoteInterface):
66 __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com"
68 return DictOf(str, Any())
69 def publish(announcement=Announcement):
71 def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
74 class IIntroducerClient(Interface):
75 """I provide service introduction facilities for a node. I help nodes
76 publish their services to the rest of the world, and I help them learn
77 about services available on other nodes."""
79 def publish(furl, service_name, remoteinterface_name):
80 """Once you call this, I will tell the world that the Referenceable
81 available at FURL is available to provide a service named
82 SERVICE_NAME. The precise definition of the service being provided is
83 identified by the Foolscap 'remote interface name' in the last
84 parameter: this is supposed to be a globally-unique string that
85 identifies the RemoteInterface that is implemented."""
87 def subscribe_to(service_name, callback, *args, **kwargs):
88 """Call this if you will eventually want to use services with the
89 given SERVICE_NAME. This will prompt me to subscribe to announcements
90 of those services. Your callback will be invoked with at least two
91 arguments: a serverid (binary string), and an announcement
92 dictionary, followed by any additional callback args/kwargs you give
93 me. I will run your callback for both new announcements and for
94 announcements that have changed, but you must be prepared to tolerate
97 The announcement dictionary that I give you will have the following
101 service-name: str('storage')
104 remoteinterface-name: str(ri_name)
108 oldest-supported: str
110 Note that app-version will be an empty dictionary until #466 is done
111 and both the introducer and the remote client have been upgraded. For
112 current (native) server types, the serverid will always be equal to
113 the binary form of the FURL's tubid.
116 def connected_to_introducer():
117 """Returns a boolean, True if we are currently connected to the
118 introducer, False if not."""
121 class IntroducerClient_v1(service.Service, Referenceable):
122 implements(RIIntroducerSubscriberClient_v1, IIntroducerClient)
124 def __init__(self, tub, introducer_furl,
125 nickname, my_version, oldest_supported):
127 self.introducer_furl = introducer_furl
129 assert type(nickname) is unicode
130 self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
131 self._my_version = my_version
132 self._oldest_supported = oldest_supported
134 self._published_announcements = set()
136 self._publisher = None
138 self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
139 self._subscribed_service_names = set()
140 self._subscriptions = set() # requests we've actually sent
142 # _current_announcements remembers one announcement per
143 # (servicename,serverid) pair. Anything that arrives with the same
144 # pair will displace the previous one. This stores unpacked
145 # announcement dictionaries, which can be compared for equality to
146 # distinguish re-announcement from updates. It also provides memory
147 # for clients who subscribe after startup.
148 self._current_announcements = {}
150 # hooks for unit tests
151 self._debug_counts = {
152 "inbound_message": 0,
153 "inbound_announcement": 0,
155 "duplicate_announcement": 0,
157 "new_announcement": 0,
158 "outbound_message": 0,
160 self._debug_outstanding = 0
162 def _debug_retired(self, res):
163 self._debug_outstanding -= 1
166 def startService(self):
167 service.Service.startService(self)
168 self._introducer_error = None
169 rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
170 self._introducer_reconnector = rc
171 def connect_failed(failure):
172 self.log("Initial Introducer connection failed: perhaps it's down",
173 level=log.WEIRD, failure=failure, umid="c5MqUQ")
174 d = self._tub.getReference(self.introducer_furl)
175 d.addErrback(connect_failed)
177 def _got_introducer(self, publisher):
178 self.log("connected to introducer, getting versions")
179 default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
181 "application-version": "unknown: no get_version()",
183 d = rrefutil.add_version_to_remote_reference(publisher, default)
184 d.addCallback(self._got_versioned_introducer)
185 d.addErrback(self._got_error)
187 def _got_error(self, f):
188 # TODO: for the introducer, perhaps this should halt the application
189 self._introducer_error = f # polled by tests
191 def _got_versioned_introducer(self, publisher):
192 self.log("got introducer version: %s" % (publisher.version,))
193 # we require a V1 introducer
194 needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
195 if needed not in publisher.version:
196 raise InsufficientVersionError(needed, publisher.version)
197 self._publisher = publisher
198 publisher.notifyOnDisconnect(self._disconnected)
199 self._maybe_publish()
200 self._maybe_subscribe()
202 def _disconnected(self):
203 self.log("bummer, we've lost our connection to the introducer")
204 self._publisher = None
205 self._subscriptions.clear()
207 def log(self, *args, **kwargs):
208 if "facility" not in kwargs:
209 kwargs["facility"] = "tahoe.introducer"
210 return log.msg(*args, **kwargs)
213 def publish(self, furl, service_name, remoteinterface_name):
214 assert type(self._nickname_utf8) is str # we always send UTF-8
215 ann = (furl, service_name, remoteinterface_name,
216 self._nickname_utf8, self._my_version, self._oldest_supported)
217 self._published_announcements.add(ann)
218 self._maybe_publish()
220 def subscribe_to(self, service_name, cb, *args, **kwargs):
221 self._local_subscribers.append( (service_name,cb,args,kwargs) )
222 self._subscribed_service_names.add(service_name)
223 self._maybe_subscribe()
224 for (servicename,nodeid),ann_d in self._current_announcements.items():
225 if servicename == service_name:
226 eventually(cb, nodeid, ann_d)
228 def _maybe_subscribe(self):
229 if not self._publisher:
230 self.log("want to subscribe, but no introducer yet",
233 for service_name in self._subscribed_service_names:
234 if service_name not in self._subscriptions:
235 # there is a race here, but the subscription desk ignores
236 # duplicate requests.
237 self._subscriptions.add(service_name)
238 self._debug_outstanding += 1
239 d = self._publisher.callRemote("subscribe", self, service_name)
240 d.addBoth(self._debug_retired)
241 d.addErrback(rrefutil.trap_deadref)
242 d.addErrback(log.err, format="server errored during subscribe",
243 facility="tahoe.introducer",
244 level=log.WEIRD, umid="2uMScQ")
246 def _maybe_publish(self):
247 if not self._publisher:
248 self.log("want to publish, but no introducer yet", level=log.NOISY)
250 # this re-publishes everything. The Introducer ignores duplicates
251 for ann in self._published_announcements:
252 self._debug_counts["outbound_message"] += 1
253 self._debug_outstanding += 1
254 d = self._publisher.callRemote("publish", ann)
255 d.addBoth(self._debug_retired)
256 d.addErrback(rrefutil.trap_deadref)
257 d.addErrback(log.err,
258 format="server errored during publish %(ann)s",
259 ann=ann, facility="tahoe.introducer",
260 level=log.WEIRD, umid="xs9pVQ")
264 def remote_announce(self, announcements):
265 self.log("received %d announcements" % len(announcements))
266 self._debug_counts["inbound_message"] += 1
267 for ann in announcements:
269 self._process_announcement(ann)
271 log.err(format="unable to process announcement %(ann)s",
273 # Don't let a corrupt announcement prevent us from processing
274 # the remaining ones. Don't return an error to the server,
275 # since they'd just ignore it anyways.
278 def _process_announcement(self, ann):
279 self._debug_counts["inbound_announcement"] += 1
280 (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
281 if service_name not in self._subscribed_service_names:
282 self.log("announcement for a service we don't care about [%s]"
283 % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
284 self._debug_counts["wrong_service"] += 1
286 self.log("announcement for [%s]: %s" % (service_name, ann),
288 assert type(furl) is str
289 assert type(service_name) is str
290 assert type(ri_name) is str
291 assert type(nickname_utf8) is str
292 nickname = nickname_utf8.decode("utf-8")
293 assert type(nickname) is unicode
294 assert type(ver) is str
295 assert type(oldest) is str
297 nodeid = b32decode(SturdyRef(furl).tubID.upper())
298 nodeid_s = idlib.shortnodeid_b2a(nodeid)
300 ann_d = { "version": 0,
301 "service-name": service_name,
304 "nickname": nickname,
305 "app-versions": {}, # need #466 and v2 introducer
307 "oldest-supported": oldest,
310 index = (service_name, nodeid)
311 if self._current_announcements.get(index, None) == ann_d:
312 self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
313 service=service_name, nodeid=nodeid_s,
314 level=log.UNUSUAL, umid="B1MIdA")
315 self._debug_counts["duplicate_announcement"] += 1
317 if index in self._current_announcements:
318 self._debug_counts["update"] += 1
320 self._debug_counts["new_announcement"] += 1
322 self._current_announcements[index] = ann_d
323 # note: we never forget an index, but we might update its value
325 for (service_name2,cb,args,kwargs) in self._local_subscribers:
326 if service_name2 == service_name:
327 eventually(cb, nodeid, ann_d, *args, **kwargs)
329 def connected_to_introducer(self):
330 return bool(self._publisher)
332 class IntroducerService_v1(service.MultiService, Referenceable):
333 implements(RIIntroducerPublisherAndSubscriberService_v1)
335 VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1":
337 "application-version": str(allmydata.__full_version__),
340 def __init__(self, basedir="."):
341 service.MultiService.__init__(self)
342 self.introducer_url = None
343 # 'index' is (service_name, tubid)
344 self._announcements = {} # dict of index -> (announcement, timestamp)
345 self._subscribers = {} # [service_name]->[rref]->timestamp
346 self._debug_counts = {"inbound_message": 0,
347 "inbound_duplicate": 0,
349 "outbound_message": 0,
350 "outbound_announcements": 0,
351 "inbound_subscribe": 0}
352 self._debug_outstanding = 0
354 def _debug_retired(self, res):
355 self._debug_outstanding -= 1
358 def log(self, *args, **kwargs):
359 if "facility" not in kwargs:
360 kwargs["facility"] = "tahoe.introducer"
361 return log.msg(*args, **kwargs)
363 def get_announcements(self, include_stub_clients=True):
365 for index, (ann_t, when) in self._announcements.items():
366 (furl, service_name, ri_name, nickname, ver, oldest) = ann_t
367 if service_name == "stub_client" and not include_stub_clients:
369 ann_d = {"nickname": nickname.decode("utf-8", "replace"),
371 "service-name": service_name,
372 "anonymous-storage-FURL": furl,
374 # the V2 introducer uses (service_name, key_s, tubid_s) as an
375 # index, so match that format for AnnouncementDescriptor
376 new_index = (index[0], None, idlib.nodeid_b2a(index[1]))
377 ad = AnnouncementDescriptor(when, new_index, None, ann_d)
378 announcements.append(ad)
381 def get_subscribers(self):
383 for service_name, subscribers in self._subscribers.items():
384 for rref, when in subscribers.items():
385 tubid = rref.getRemoteTubID() or "?"
386 remote_address = rrefutil.stringify_remote_address(rref)
387 nickname, version, app_versions = u"?", u"?", {}
388 sd = SubscriberDescriptor(service_name, when,
389 nickname, version, app_versions,
390 remote_address, tubid)
394 def remote_get_version(self):
397 def remote_publish(self, announcement):
399 self._publish(announcement)
401 log.err(format="Introducer.remote_publish failed on %(ann)s",
402 ann=announcement, level=log.UNUSUAL, umid="620rWA")
405 def _publish(self, announcement):
406 self._debug_counts["inbound_message"] += 1
407 self.log("introducer: announcement published: %s" % (announcement,) )
408 (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
409 #print "PUB", service_name, nickname_utf8
411 nodeid = b32decode(SturdyRef(furl).tubID.upper())
412 index = (service_name, nodeid)
414 if index in self._announcements:
415 (old_announcement, timestamp) = self._announcements[index]
416 if old_announcement == announcement:
417 self.log("but we already knew it, ignoring", level=log.NOISY)
418 self._debug_counts["inbound_duplicate"] += 1
421 self.log("old announcement being updated", level=log.NOISY)
422 self._debug_counts["inbound_update"] += 1
423 self._announcements[index] = (announcement, time.time())
425 for s in self._subscribers.get(service_name, []):
426 self._debug_counts["outbound_message"] += 1
427 self._debug_counts["outbound_announcements"] += 1
428 self._debug_outstanding += 1
429 d = s.callRemote("announce", set([announcement]))
430 d.addBoth(self._debug_retired)
431 d.addErrback(rrefutil.trap_deadref)
432 d.addErrback(log.err,
433 format="subscriber errored on announcement %(ann)s",
434 ann=announcement, facility="tahoe.introducer",
435 level=log.UNUSUAL, umid="jfGMXQ")
437 def remote_subscribe(self, subscriber, service_name):
438 self.log("introducer: subscription[%s] request at %s" % (service_name,
440 self._debug_counts["inbound_subscribe"] += 1
441 if service_name not in self._subscribers:
442 self._subscribers[service_name] = {}
443 subscribers = self._subscribers[service_name]
444 if subscriber in subscribers:
445 self.log("but they're already subscribed, ignoring",
448 subscribers[subscriber] = time.time()
450 self.log("introducer: unsubscribing[%s] %s" % (service_name,
452 subscribers.pop(subscriber, None)
453 subscriber.notifyOnDisconnect(_remove)
457 for (sn2,nodeid),(ann,when) in self._announcements.items()
458 if sn2 == service_name] )
460 self._debug_counts["outbound_message"] += 1
461 self._debug_counts["outbound_announcements"] += len(announcements)
462 self._debug_outstanding += 1
463 d = subscriber.callRemote("announce", announcements)
464 d.addBoth(self._debug_retired)
465 d.addErrback(rrefutil.trap_deadref)
466 d.addErrback(log.err,
467 format="subscriber errored during subscribe %(anns)s",
468 anns=announcements, facility="tahoe.introducer",
469 level=log.UNUSUAL, umid="1XChxA")