2 from base64 import b32decode
3 from zope.interface import implements
4 from twisted.application import service
5 from foolscap.api import Referenceable, SturdyRef, eventually
6 from allmydata.interfaces import InsufficientVersionError
7 from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
9 from allmydata.util import log, idlib
10 from allmydata.util.rrefutil import add_version_to_remote_reference, trap_deadref
13 class IntroducerClient(service.Service, Referenceable):
14 implements(RIIntroducerSubscriberClient, IIntroducerClient)
16 def __init__(self, tub, introducer_furl,
17 nickname, my_version, oldest_supported):
19 self.introducer_furl = introducer_furl
21 assert type(nickname) is unicode
22 self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
23 self._my_version = my_version
24 self._oldest_supported = oldest_supported
26 self._published_announcements = set()
28 self._publisher = None
30 self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
31 self._subscribed_service_names = set()
32 self._subscriptions = set() # requests we've actually sent
34 # _current_announcements remembers one announcement per
35 # (servicename,serverid) pair. Anything that arrives with the same
36 # pair will displace the previous one. This stores unpacked
37 # announcement dictionaries, which can be compared for equality to
38 # distinguish re-announcement from updates. It also provides memory
39 # for clients who subscribe after startup.
40 self._current_announcements = {}
42 self.encoding_parameters = None
44 # hooks for unit tests
45 self._debug_counts = {
47 "inbound_announcement": 0,
49 "duplicate_announcement": 0,
51 "new_announcement": 0,
52 "outbound_message": 0,
55 def startService(self):
56 service.Service.startService(self)
57 self._introducer_error = None
58 rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
59 self._introducer_reconnector = rc
60 def connect_failed(failure):
61 self.log("Initial Introducer connection failed: perhaps it's down",
62 level=log.WEIRD, failure=failure, umid="c5MqUQ")
63 d = self._tub.getReference(self.introducer_furl)
64 d.addErrback(connect_failed)
66 def _got_introducer(self, publisher):
67 self.log("connected to introducer, getting versions")
68 default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
70 "application-version": "unknown: no get_version()",
72 d = add_version_to_remote_reference(publisher, default)
73 d.addCallback(self._got_versioned_introducer)
74 d.addErrback(self._got_error)
76 def _got_error(self, f):
77 # TODO: for the introducer, perhaps this should halt the application
78 self._introducer_error = f # polled by tests
80 def _got_versioned_introducer(self, publisher):
81 self.log("got introducer version: %s" % (publisher.version,))
82 # we require a V1 introducer
83 needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
84 if needed not in publisher.version:
85 raise InsufficientVersionError(needed, publisher.version)
86 self._publisher = publisher
87 publisher.notifyOnDisconnect(self._disconnected)
89 self._maybe_subscribe()
91 def _disconnected(self):
92 self.log("bummer, we've lost our connection to the introducer")
93 self._publisher = None
94 self._subscriptions.clear()
96 def log(self, *args, **kwargs):
97 if "facility" not in kwargs:
98 kwargs["facility"] = "tahoe.introducer"
99 return log.msg(*args, **kwargs)
102 def publish(self, furl, service_name, remoteinterface_name):
103 assert type(self._nickname_utf8) is str # we always send UTF-8
104 ann = (furl, service_name, remoteinterface_name,
105 self._nickname_utf8, self._my_version, self._oldest_supported)
106 self._published_announcements.add(ann)
107 self._maybe_publish()
109 def subscribe_to(self, service_name, cb, *args, **kwargs):
110 self._local_subscribers.append( (service_name,cb,args,kwargs) )
111 self._subscribed_service_names.add(service_name)
112 self._maybe_subscribe()
113 for (servicename,nodeid),ann_d in self._current_announcements.items():
114 if servicename == service_name:
115 eventually(cb, nodeid, ann_d)
117 def _maybe_subscribe(self):
118 if not self._publisher:
119 self.log("want to subscribe, but no introducer yet",
122 for service_name in self._subscribed_service_names:
123 if service_name not in self._subscriptions:
124 # there is a race here, but the subscription desk ignores
125 # duplicate requests.
126 self._subscriptions.add(service_name)
127 d = self._publisher.callRemote("subscribe", self, service_name)
128 d.addErrback(trap_deadref)
129 d.addErrback(log.err, format="server errored during subscribe",
130 facility="tahoe.introducer",
131 level=log.WEIRD, umid="2uMScQ")
133 def _maybe_publish(self):
134 if not self._publisher:
135 self.log("want to publish, but no introducer yet", level=log.NOISY)
137 # this re-publishes everything. The Introducer ignores duplicates
138 for ann in self._published_announcements:
139 self._debug_counts["outbound_message"] += 1
140 d = self._publisher.callRemote("publish", ann)
141 d.addErrback(trap_deadref)
142 d.addErrback(log.err,
143 format="server errored during publish %(ann)s",
144 ann=ann, facility="tahoe.introducer",
145 level=log.WEIRD, umid="xs9pVQ")
149 def remote_announce(self, announcements):
150 self.log("received %d announcements" % len(announcements))
151 self._debug_counts["inbound_message"] += 1
152 for ann in announcements:
154 self._process_announcement(ann)
156 log.err(format="unable to process announcement %(ann)s",
158 # Don't let a corrupt announcement prevent us from processing
159 # the remaining ones. Don't return an error to the server,
160 # since they'd just ignore it anyways.
163 def _process_announcement(self, ann):
164 self._debug_counts["inbound_announcement"] += 1
165 (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
166 if service_name not in self._subscribed_service_names:
167 self.log("announcement for a service we don't care about [%s]"
168 % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
169 self._debug_counts["wrong_service"] += 1
171 self.log("announcement for [%s]: %s" % (service_name, ann),
173 assert type(furl) is str
174 assert type(service_name) is str
175 assert type(ri_name) is str
176 assert type(nickname_utf8) is str
177 nickname = nickname_utf8.decode("utf-8")
178 assert type(nickname) is unicode
179 assert type(ver) is str
180 assert type(oldest) is str
182 nodeid = b32decode(SturdyRef(furl).tubID.upper())
183 nodeid_s = idlib.shortnodeid_b2a(nodeid)
185 ann_d = { "version": 0,
186 "service-name": service_name,
189 "nickname": nickname,
190 "app-versions": {}, # need #466 and v2 introducer
192 "oldest-supported": oldest,
195 index = (service_name, nodeid)
196 if self._current_announcements.get(index, None) == ann_d:
197 self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
198 service=service_name, nodeid=nodeid_s,
199 level=log.UNUSUAL, umid="B1MIdA")
200 self._debug_counts["duplicate_announcement"] += 1
202 if index in self._current_announcements:
203 self._debug_counts["update"] += 1
205 self._debug_counts["new_announcement"] += 1
207 self._current_announcements[index] = ann_d
208 # note: we never forget an index, but we might update its value
210 for (service_name2,cb,args,kwargs) in self._local_subscribers:
211 if service_name2 == service_name:
212 eventually(cb, nodeid, ann_d, *args, **kwargs)
214 def remote_set_encoding_parameters(self, parameters):
215 self.encoding_parameters = parameters
217 def connected_to_introducer(self):
218 return bool(self._publisher)