3 from zope.interface import implements
4 from twisted.application import service
5 from foolscap.api import Referenceable, eventually, RemoteInterface
6 from allmydata.interfaces import InsufficientVersionError
7 from allmydata.introducer.interfaces import IIntroducerClient, \
8 RIIntroducerSubscriberClient_v1, RIIntroducerSubscriberClient_v2
9 from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\
10 convert_announcement_v1_to_v2, convert_announcement_v2_to_v1, \
11 make_index, get_tubid_string_from_ann, get_tubid_string
12 from allmydata.util import log
13 from allmydata.util.rrefutil import add_version_to_remote_reference
14 from allmydata.util.keyutil import BadSignatureError
16 class WrapV2ClientInV1Interface(Referenceable): # for_v1
17 """I wrap a v2 IntroducerClient to make it look like a v1 client, so it
18 can be attached to an old server."""
19 implements(RIIntroducerSubscriberClient_v1)
21 def __init__(self, original):
22 self.original = original
24 def remote_announce(self, announcements):
25 lp = self.original.log("received %d announcements (v1)" %
27 anns_v1 = set([convert_announcement_v1_to_v2(ann_v1)
28 for ann_v1 in announcements])
29 return self.original.got_announcements(anns_v1, lp)
31 def remote_set_encoding_parameters(self, parameters):
32 self.original.remote_set_encoding_parameters(parameters)
34 class RIStubClient(RemoteInterface): # for_v1
35 """Each client publishes a service announcement for a dummy object called
36 the StubClient. This object doesn't actually offer any services, but the
37 announcement helps the Introducer keep track of which clients are
38 subscribed (so the grid admin can keep track of things like the size of
39 the grid and the client versions in use. This is the (empty)
40 RemoteInterface for the StubClient."""
42 class StubClient(Referenceable): # for_v1
43 implements(RIStubClient)
45 V1 = "http://allmydata.org/tahoe/protocols/introducer/v1"
46 V2 = "http://allmydata.org/tahoe/protocols/introducer/v2"
48 class IntroducerClient(service.Service, Referenceable):
49 implements(RIIntroducerSubscriberClient_v2, IIntroducerClient)
51 def __init__(self, tub, introducer_furl,
52 nickname, my_version, oldest_supported,
55 self.introducer_furl = introducer_furl
57 assert type(nickname) is unicode
58 self._nickname = nickname
59 self._my_version = my_version
60 self._oldest_supported = oldest_supported
61 self._app_versions = app_versions
63 self._my_subscriber_info = { "version": 0,
64 "nickname": self._nickname,
65 "app-versions": self._app_versions,
66 "my-version": self._my_version,
67 "oldest-supported": self._oldest_supported,
69 self._stub_client = None # for_v1
70 self._stub_client_furl = None
72 self._published_announcements = {}
73 self._canary = Referenceable()
75 self._publisher = None
77 self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
78 self._subscribed_service_names = set()
79 self._subscriptions = set() # requests we've actually sent
81 # _current_announcements remembers one announcement per
82 # (servicename,serverid) pair. Anything that arrives with the same
83 # pair will displace the previous one. This stores tuples of
84 # (unpacked announcement dictionary, verifyingkey, rxtime). The ann
85 # dicts can be compared for equality to distinguish re-announcement
86 # from updates. It also provides memory for clients who subscribe
88 self._current_announcements = {}
90 self.encoding_parameters = None
92 # hooks for unit tests
93 self._debug_counts = {
95 "inbound_announcement": 0,
97 "duplicate_announcement": 0,
99 "new_announcement": 0,
100 "outbound_message": 0,
102 self._debug_outstanding = 0
104 def _debug_retired(self, res):
105 self._debug_outstanding -= 1
108 def startService(self):
109 service.Service.startService(self)
110 self._introducer_error = None
111 rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
112 self._introducer_reconnector = rc
113 def connect_failed(failure):
114 self.log("Initial Introducer connection failed: perhaps it's down",
115 level=log.WEIRD, failure=failure, umid="c5MqUQ")
116 d = self._tub.getReference(self.introducer_furl)
117 d.addErrback(connect_failed)
119 def _got_introducer(self, publisher):
120 self.log("connected to introducer, getting versions")
121 default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
123 "application-version": "unknown: no get_version()",
125 d = add_version_to_remote_reference(publisher, default)
126 d.addCallback(self._got_versioned_introducer)
127 d.addErrback(self._got_error)
129 def _got_error(self, f):
130 # TODO: for the introducer, perhaps this should halt the application
131 self._introducer_error = f # polled by tests
133 def _got_versioned_introducer(self, publisher):
134 self.log("got introducer version: %s" % (publisher.version,))
135 # we require an introducer that speaks at least one of (V1, V2)
136 if not (V1 in publisher.version or V2 in publisher.version):
137 raise InsufficientVersionError("V1 or V2", publisher.version)
138 self._publisher = publisher
139 publisher.notifyOnDisconnect(self._disconnected)
140 self._maybe_publish()
141 self._maybe_subscribe()
143 def _disconnected(self):
144 self.log("bummer, we've lost our connection to the introducer")
145 self._publisher = None
146 self._subscriptions.clear()
148 def log(self, *args, **kwargs):
149 if "facility" not in kwargs:
150 kwargs["facility"] = "tahoe.introducer.client"
151 return log.msg(*args, **kwargs)
153 def subscribe_to(self, service_name, cb, *args, **kwargs):
154 self._local_subscribers.append( (service_name,cb,args,kwargs) )
155 self._subscribed_service_names.add(service_name)
156 self._maybe_subscribe()
157 for index,(ann,key_s,when) in self._current_announcements.items():
158 servicename = index[0]
159 if servicename == service_name:
160 eventually(cb, key_s, ann, *args, **kwargs)
162 def _maybe_subscribe(self):
163 if not self._publisher:
164 self.log("want to subscribe, but no introducer yet",
167 for service_name in self._subscribed_service_names:
168 if service_name in self._subscriptions:
170 self._subscriptions.add(service_name)
171 if V2 in self._publisher.version:
172 self._debug_outstanding += 1
173 d = self._publisher.callRemote("subscribe_v2",
175 self._my_subscriber_info)
176 d.addBoth(self._debug_retired)
178 d = self._subscribe_handle_v1(service_name) # for_v1
179 d.addErrback(log.err, facility="tahoe.introducer.client",
180 level=log.WEIRD, umid="2uMScQ")
182 def _subscribe_handle_v1(self, service_name): # for_v1
183 # they don't speak V2: must be a v1 introducer. Fall back to the v1
184 # 'subscribe' method, using a client adapter.
185 ca = WrapV2ClientInV1Interface(self)
186 self._debug_outstanding += 1
187 d = self._publisher.callRemote("subscribe", ca, service_name)
188 d.addBoth(self._debug_retired)
189 # We must also publish an empty 'stub_client' object, so the
190 # introducer can count how many clients are connected and see what
191 # versions they're running.
192 if not self._stub_client_furl:
193 self._stub_client = sc = StubClient()
194 self._stub_client_furl = self._tub.registerReference(sc)
195 def _publish_stub_client(ignored):
196 furl = self._stub_client_furl
197 self.publish("stub_client",
198 { "anonymous-storage-FURL": furl,
199 "permutation-seed-base32": get_tubid_string(furl),
201 d.addCallback(_publish_stub_client)
204 def create_announcement(self, service_name, ann, signing_key):
205 full_ann = { "version": 0,
206 "nickname": self._nickname,
207 "app-versions": self._app_versions,
208 "my-version": self._my_version,
209 "oldest-supported": self._oldest_supported,
211 "service-name": service_name,
214 return sign_to_foolscap(full_ann, signing_key)
216 def publish(self, service_name, ann, signing_key=None):
217 ann_t = self.create_announcement(service_name, ann, signing_key)
218 self._published_announcements[service_name] = ann_t
219 self._maybe_publish()
221 def _maybe_publish(self):
222 if not self._publisher:
223 self.log("want to publish, but no introducer yet", level=log.NOISY)
225 # this re-publishes everything. The Introducer ignores duplicates
226 for ann_t in self._published_announcements.values():
227 self._debug_counts["outbound_message"] += 1
228 if V2 in self._publisher.version:
229 self._debug_outstanding += 1
230 d = self._publisher.callRemote("publish_v2", ann_t,
232 d.addBoth(self._debug_retired)
234 d = self._handle_v1_publisher(ann_t) # for_v1
235 d.addErrback(log.err, ann_t=ann_t,
236 facility="tahoe.introducer.client",
237 level=log.WEIRD, umid="xs9pVQ")
239 def _handle_v1_publisher(self, ann_t): # for_v1
240 # they don't speak V2, so fall back to the old 'publish' method
241 # (which takes an unsigned tuple of bytestrings)
242 self.log("falling back to publish_v1",
243 level=log.UNUSUAL, umid="9RCT1A")
244 ann_v1 = convert_announcement_v2_to_v1(ann_t)
245 self._debug_outstanding += 1
246 d = self._publisher.callRemote("publish", ann_v1)
247 d.addBoth(self._debug_retired)
251 def remote_announce_v2(self, announcements):
252 lp = self.log("received %d announcements (v2)" % len(announcements))
253 return self.got_announcements(announcements, lp)
255 def got_announcements(self, announcements, lp=None):
256 # this is the common entry point for both v1 and v2 announcements
257 self._debug_counts["inbound_message"] += 1
258 for ann_t in announcements:
260 # this might raise UnknownKeyError or bad-sig error
261 ann, key_s = unsign_from_foolscap(ann_t)
262 # key is "v0-base32abc123"
263 except BadSignatureError:
264 self.log("bad signature on inbound announcement: %s" % (ann_t,),
265 parent=lp, level=log.WEIRD, umid="ZAU15Q")
266 # process other announcements that arrived with the bad one
269 self._process_announcement(ann, key_s)
271 def _process_announcement(self, ann, key_s):
272 self._debug_counts["inbound_announcement"] += 1
273 service_name = str(ann["service-name"])
274 if service_name not in self._subscribed_service_names:
275 self.log("announcement for a service we don't care about [%s]"
276 % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
277 self._debug_counts["wrong_service"] += 1
279 # for ASCII values, simplejson might give us unicode *or* bytes
280 if "nickname" in ann and isinstance(ann["nickname"], str):
281 ann["nickname"] = unicode(ann["nickname"])
282 nick_s = ann.get("nickname",u"").encode("utf-8")
283 lp2 = self.log(format="announcement for nickname '%(nick)s', service=%(svc)s: %(ann)s",
284 nick=nick_s, svc=service_name, ann=ann, umid="BoKEag")
286 # how do we describe this node in the logs?
289 desc_bits.append("serverid=" + key_s[:20])
290 if "anonymous-storage-FURL" in ann:
291 tubid_s = get_tubid_string_from_ann(ann)
292 desc_bits.append("tubid=" + tubid_s[:8])
293 description = "/".join(desc_bits)
295 # the index is used to track duplicates
296 index = make_index(ann, key_s)
298 # is this announcement a duplicate?
299 if (index in self._current_announcements
300 and self._current_announcements[index][0] == ann):
301 self.log(format="reannouncement for [%(service)s]:%(description)s, ignoring",
302 service=service_name, description=description,
303 parent=lp2, level=log.UNUSUAL, umid="B1MIdA")
304 self._debug_counts["duplicate_announcement"] += 1
306 # does it update an existing one?
307 if index in self._current_announcements:
308 self._debug_counts["update"] += 1
309 self.log("replacing old announcement: %s" % (ann,),
310 parent=lp2, level=log.NOISY, umid="wxwgIQ")
312 self._debug_counts["new_announcement"] += 1
313 self.log("new announcement[%s]" % service_name,
314 parent=lp2, level=log.NOISY)
316 self._current_announcements[index] = (ann, key_s, time.time())
317 # note: we never forget an index, but we might update its value
319 for (service_name2,cb,args,kwargs) in self._local_subscribers:
320 if service_name2 == service_name:
321 eventually(cb, key_s, ann, *args, **kwargs)
323 def remote_set_encoding_parameters(self, parameters):
324 self.encoding_parameters = parameters
326 def connected_to_introducer(self):
327 return bool(self._publisher)