3 from zope.interface import implements
4 from twisted.application import service
5 from foolscap.api import Referenceable, eventually, RemoteInterface
6 from allmydata.interfaces import InsufficientVersionError
7 from allmydata.introducer.interfaces import IIntroducerClient, \
8 RIIntroducerSubscriberClient_v1, RIIntroducerSubscriberClient_v2
9 from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\
10 convert_announcement_v1_to_v2, convert_announcement_v2_to_v1, \
11 make_index, get_tubid_string_from_ann, get_tubid_string
12 from allmydata.util import log
13 from allmydata.util.rrefutil import add_version_to_remote_reference
14 from allmydata.util.keyutil import BadSignatureError
16 class WrapV2ClientInV1Interface(Referenceable): # for_v1
17 """I wrap a v2 IntroducerClient to make it look like a v1 client, so it
18 can be attached to an old server."""
19 implements(RIIntroducerSubscriberClient_v1)
21 def __init__(self, original):
22 self.original = original
24 def remote_announce(self, announcements):
25 lp = self.original.log("received %d announcements (v1)" %
27 anns_v1 = set([convert_announcement_v1_to_v2(ann_v1)
28 for ann_v1 in announcements])
29 return self.original.got_announcements(anns_v1, lp)
31 def remote_set_encoding_parameters(self, parameters):
32 self.original.remote_set_encoding_parameters(parameters)
34 class RIStubClient(RemoteInterface): # for_v1
35 """Each client publishes a service announcement for a dummy object called
36 the StubClient. This object doesn't actually offer any services, but the
37 announcement helps the Introducer keep track of which clients are
38 subscribed (so the grid admin can keep track of things like the size of
39 the grid and the client versions in use. This is the (empty)
40 RemoteInterface for the StubClient."""
42 class StubClient(Referenceable): # for_v1
43 implements(RIStubClient)
45 V1 = "http://allmydata.org/tahoe/protocols/introducer/v1"
46 V2 = "http://allmydata.org/tahoe/protocols/introducer/v2"
48 class IntroducerClient(service.Service, Referenceable):
49 implements(RIIntroducerSubscriberClient_v2, IIntroducerClient)
51 def __init__(self, tub, introducer_furl,
52 nickname, my_version, oldest_supported,
53 app_versions, sequencer):
55 self.introducer_furl = introducer_furl
57 assert type(nickname) is unicode
58 self._nickname = nickname
59 self._my_version = my_version
60 self._oldest_supported = oldest_supported
61 self._app_versions = app_versions
62 self._sequencer = sequencer
64 self._my_subscriber_info = { "version": 0,
65 "nickname": self._nickname,
66 "app-versions": self._app_versions,
67 "my-version": self._my_version,
68 "oldest-supported": self._oldest_supported,
70 self._stub_client = None # for_v1
71 self._stub_client_furl = None
73 self._outbound_announcements = {} # not signed
74 self._published_announcements = {} # signed
75 self._canary = Referenceable()
77 self._publisher = None
79 self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
80 self._subscribed_service_names = set()
81 self._subscriptions = set() # requests we've actually sent
83 # _inbound_announcements remembers one announcement per
84 # (servicename,serverid) pair. Anything that arrives with the same
85 # pair will displace the previous one. This stores tuples of
86 # (unpacked announcement dictionary, verifyingkey, rxtime). The ann
87 # dicts can be compared for equality to distinguish re-announcement
88 # from updates. It also provides memory for clients who subscribe
90 self._inbound_announcements = {}
92 self.encoding_parameters = None
94 # hooks for unit tests
95 self._debug_counts = {
97 "inbound_announcement": 0,
99 "duplicate_announcement": 0,
101 "new_announcement": 0,
102 "outbound_message": 0,
104 self._debug_outstanding = 0
106 def _debug_retired(self, res):
107 self._debug_outstanding -= 1
110 def startService(self):
111 service.Service.startService(self)
112 self._introducer_error = None
113 rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
114 self._introducer_reconnector = rc
115 def connect_failed(failure):
116 self.log("Initial Introducer connection failed: perhaps it's down",
117 level=log.WEIRD, failure=failure, umid="c5MqUQ")
118 d = self._tub.getReference(self.introducer_furl)
119 d.addErrback(connect_failed)
121 def _got_introducer(self, publisher):
122 self.log("connected to introducer, getting versions")
123 default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
125 "application-version": "unknown: no get_version()",
127 d = add_version_to_remote_reference(publisher, default)
128 d.addCallback(self._got_versioned_introducer)
129 d.addErrback(self._got_error)
131 def _got_error(self, f):
132 # TODO: for the introducer, perhaps this should halt the application
133 self._introducer_error = f # polled by tests
135 def _got_versioned_introducer(self, publisher):
136 self.log("got introducer version: %s" % (publisher.version,))
137 # we require an introducer that speaks at least one of (V1, V2)
138 if not (V1 in publisher.version or V2 in publisher.version):
139 raise InsufficientVersionError("V1 or V2", publisher.version)
140 self._publisher = publisher
141 publisher.notifyOnDisconnect(self._disconnected)
142 self._maybe_publish()
143 self._maybe_subscribe()
145 def _disconnected(self):
146 self.log("bummer, we've lost our connection to the introducer")
147 self._publisher = None
148 self._subscriptions.clear()
150 def log(self, *args, **kwargs):
151 if "facility" not in kwargs:
152 kwargs["facility"] = "tahoe.introducer.client"
153 return log.msg(*args, **kwargs)
155 def subscribe_to(self, service_name, cb, *args, **kwargs):
156 self._local_subscribers.append( (service_name,cb,args,kwargs) )
157 self._subscribed_service_names.add(service_name)
158 self._maybe_subscribe()
159 for index,(ann,key_s,when) in self._inbound_announcements.items():
160 servicename = index[0]
161 if servicename == service_name:
162 eventually(cb, key_s, ann, *args, **kwargs)
164 def _maybe_subscribe(self):
165 if not self._publisher:
166 self.log("want to subscribe, but no introducer yet",
169 for service_name in self._subscribed_service_names:
170 if service_name in self._subscriptions:
172 self._subscriptions.add(service_name)
173 if V2 in self._publisher.version:
174 self._debug_outstanding += 1
175 d = self._publisher.callRemote("subscribe_v2",
177 self._my_subscriber_info)
178 d.addBoth(self._debug_retired)
180 d = self._subscribe_handle_v1(service_name) # for_v1
181 d.addErrback(log.err, facility="tahoe.introducer.client",
182 level=log.WEIRD, umid="2uMScQ")
184 def _subscribe_handle_v1(self, service_name): # for_v1
185 # they don't speak V2: must be a v1 introducer. Fall back to the v1
186 # 'subscribe' method, using a client adapter.
187 ca = WrapV2ClientInV1Interface(self)
188 self._debug_outstanding += 1
189 d = self._publisher.callRemote("subscribe", ca, service_name)
190 d.addBoth(self._debug_retired)
191 # We must also publish an empty 'stub_client' object, so the
192 # introducer can count how many clients are connected and see what
193 # versions they're running.
194 if not self._stub_client_furl:
195 self._stub_client = sc = StubClient()
196 self._stub_client_furl = self._tub.registerReference(sc)
197 def _publish_stub_client(ignored):
198 furl = self._stub_client_furl
199 self.publish("stub_client",
200 { "anonymous-storage-FURL": furl,
201 "permutation-seed-base32": get_tubid_string(furl),
203 d.addCallback(_publish_stub_client)
206 def create_announcement_dict(self, service_name, ann):
207 ann_d = { "version": 0,
208 # "seqnum" and "nonce" will be populated with new values in
209 # publish(), each time we make a change
210 "nickname": self._nickname,
211 "app-versions": self._app_versions,
212 "my-version": self._my_version,
213 "oldest-supported": self._oldest_supported,
215 "service-name": service_name,
220 def publish(self, service_name, ann, signing_key=None):
221 # we increment the seqnum every time we publish something new
222 current_seqnum, current_nonce = self._sequencer()
224 ann_d = self.create_announcement_dict(service_name, ann)
225 self._outbound_announcements[service_name] = ann_d
227 # publish all announcements with the new seqnum and nonce
228 for service_name,ann_d in self._outbound_announcements.items():
229 ann_d["seqnum"] = current_seqnum
230 ann_d["nonce"] = current_nonce
231 ann_t = sign_to_foolscap(ann_d, signing_key)
232 self._published_announcements[service_name] = ann_t
233 self._maybe_publish()
235 def _maybe_publish(self):
236 if not self._publisher:
237 self.log("want to publish, but no introducer yet", level=log.NOISY)
239 # this re-publishes everything. The Introducer ignores duplicates
240 for ann_t in self._published_announcements.values():
241 self._debug_counts["outbound_message"] += 1
242 if V2 in self._publisher.version:
243 self._debug_outstanding += 1
244 d = self._publisher.callRemote("publish_v2", ann_t,
246 d.addBoth(self._debug_retired)
248 d = self._handle_v1_publisher(ann_t) # for_v1
249 d.addErrback(log.err, ann_t=ann_t,
250 facility="tahoe.introducer.client",
251 level=log.WEIRD, umid="xs9pVQ")
253 def _handle_v1_publisher(self, ann_t): # for_v1
254 # they don't speak V2, so fall back to the old 'publish' method
255 # (which takes an unsigned tuple of bytestrings)
256 self.log("falling back to publish_v1",
257 level=log.UNUSUAL, umid="9RCT1A")
258 ann_v1 = convert_announcement_v2_to_v1(ann_t)
259 self._debug_outstanding += 1
260 d = self._publisher.callRemote("publish", ann_v1)
261 d.addBoth(self._debug_retired)
265 def remote_announce_v2(self, announcements):
266 lp = self.log("received %d announcements (v2)" % len(announcements))
267 return self.got_announcements(announcements, lp)
269 def got_announcements(self, announcements, lp=None):
270 # this is the common entry point for both v1 and v2 announcements
271 self._debug_counts["inbound_message"] += 1
272 for ann_t in announcements:
274 # this might raise UnknownKeyError or bad-sig error
275 ann, key_s = unsign_from_foolscap(ann_t)
276 # key is "v0-base32abc123"
277 except BadSignatureError:
278 self.log("bad signature on inbound announcement: %s" % (ann_t,),
279 parent=lp, level=log.WEIRD, umid="ZAU15Q")
280 # process other announcements that arrived with the bad one
283 self._process_announcement(ann, key_s)
285 def _process_announcement(self, ann, key_s):
286 self._debug_counts["inbound_announcement"] += 1
287 service_name = str(ann["service-name"])
288 if service_name not in self._subscribed_service_names:
289 self.log("announcement for a service we don't care about [%s]"
290 % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
291 self._debug_counts["wrong_service"] += 1
293 # for ASCII values, simplejson might give us unicode *or* bytes
294 if "nickname" in ann and isinstance(ann["nickname"], str):
295 ann["nickname"] = unicode(ann["nickname"])
296 nick_s = ann.get("nickname",u"").encode("utf-8")
297 lp2 = self.log(format="announcement for nickname '%(nick)s', service=%(svc)s: %(ann)s",
298 nick=nick_s, svc=service_name, ann=ann, umid="BoKEag")
300 # how do we describe this node in the logs?
303 desc_bits.append("serverid=" + key_s[:20])
304 if "anonymous-storage-FURL" in ann:
305 tubid_s = get_tubid_string_from_ann(ann)
306 desc_bits.append("tubid=" + tubid_s[:8])
307 description = "/".join(desc_bits)
309 # the index is used to track duplicates
310 index = make_index(ann, key_s)
312 # is this announcement a duplicate?
313 if (index in self._inbound_announcements
314 and self._inbound_announcements[index][0] == ann):
315 self.log(format="reannouncement for [%(service)s]:%(description)s, ignoring",
316 service=service_name, description=description,
317 parent=lp2, level=log.UNUSUAL, umid="B1MIdA")
318 self._debug_counts["duplicate_announcement"] += 1
321 # does it update an existing one?
322 if index in self._inbound_announcements:
323 old,_,_ = self._inbound_announcements[index]
325 # must beat previous sequence number to replace
326 if ("seqnum" not in ann
327 or not isinstance(ann["seqnum"], (int,long))):
328 self.log("not replacing old announcement, no valid seqnum: %s"
330 parent=lp2, level=log.NOISY, umid="zFGH3Q")
332 if ann["seqnum"] <= old["seqnum"]:
333 # note that exact replays are caught earlier, by
334 # comparing the entire signed announcement.
335 self.log("not replacing old announcement, "
336 "new seqnum is too old (%s <= %s) "
337 "(replay attack?): %s"
338 % (ann["seqnum"], old["seqnum"], ann),
339 parent=lp2, level=log.UNUSUAL, umid="JAAAoQ")
341 # ok, seqnum is newer, allow replacement
342 self._debug_counts["update"] += 1
343 self.log("replacing old announcement: %s" % (ann,),
344 parent=lp2, level=log.NOISY, umid="wxwgIQ")
346 self._debug_counts["new_announcement"] += 1
347 self.log("new announcement[%s]" % service_name,
348 parent=lp2, level=log.NOISY)
350 self._inbound_announcements[index] = (ann, key_s, time.time())
351 # note: we never forget an index, but we might update its value
353 for (service_name2,cb,args,kwargs) in self._local_subscribers:
354 if service_name2 == service_name:
355 eventually(cb, key_s, ann, *args, **kwargs)
357 def remote_set_encoding_parameters(self, parameters):
358 self.encoding_parameters = parameters
360 def connected_to_introducer(self):
361 return bool(self._publisher)