3 from zope.interface import implements
4 from twisted.application import service
5 from foolscap.api import Referenceable, eventually, RemoteInterface
6 from allmydata.interfaces import InsufficientVersionError
7 from allmydata.introducer.interfaces import IIntroducerClient, \
8 RIIntroducerSubscriberClient_v1, RIIntroducerSubscriberClient_v2
9 from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\
10 convert_announcement_v1_to_v2, convert_announcement_v2_to_v1, \
11 make_index, get_tubid_string_from_ann, get_tubid_string
12 from allmydata.util import log
13 from allmydata.util.rrefutil import add_version_to_remote_reference
14 from allmydata.util.keyutil import BadSignatureError
16 class WrapV2ClientInV1Interface(Referenceable): # for_v1
17 """I wrap a v2 IntroducerClient to make it look like a v1 client, so it
18 can be attached to an old server."""
19 implements(RIIntroducerSubscriberClient_v1)
21 def __init__(self, original):
22 self.original = original
24 def remote_announce(self, announcements):
25 lp = self.original.log("received %d announcements (v1)" %
27 anns_v1 = set([convert_announcement_v1_to_v2(ann_v1)
28 for ann_v1 in announcements])
29 return self.original.got_announcements(anns_v1, lp)
31 class RIStubClient(RemoteInterface): # for_v1
32 """Each client publishes a service announcement for a dummy object called
33 the StubClient. This object doesn't actually offer any services, but the
34 announcement helps the Introducer keep track of which clients are
35 subscribed (so the grid admin can keep track of things like the size of
36 the grid and the client versions in use. This is the (empty)
37 RemoteInterface for the StubClient."""
39 class StubClient(Referenceable): # for_v1
40 implements(RIStubClient)
42 V1 = "http://allmydata.org/tahoe/protocols/introducer/v1"
43 V2 = "http://allmydata.org/tahoe/protocols/introducer/v2"
45 class IntroducerClient(service.Service, Referenceable):
46 implements(RIIntroducerSubscriberClient_v2, IIntroducerClient)
48 def __init__(self, tub, introducer_furl,
49 nickname, my_version, oldest_supported,
50 app_versions, sequencer):
52 self.introducer_furl = introducer_furl
54 assert type(nickname) is unicode
55 self._nickname = nickname
56 self._my_version = my_version
57 self._oldest_supported = oldest_supported
58 self._app_versions = app_versions
59 self._sequencer = sequencer
61 self._my_subscriber_info = { "version": 0,
62 "nickname": self._nickname,
63 "app-versions": self._app_versions,
64 "my-version": self._my_version,
65 "oldest-supported": self._oldest_supported,
67 self._stub_client = None # for_v1
68 self._stub_client_furl = None
70 self._outbound_announcements = {} # not signed
71 self._published_announcements = {} # signed
72 self._canary = Referenceable()
74 self._publisher = None
76 self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
77 self._subscribed_service_names = set()
78 self._subscriptions = set() # requests we've actually sent
80 # _inbound_announcements remembers one announcement per
81 # (servicename,serverid) pair. Anything that arrives with the same
82 # pair will displace the previous one. This stores tuples of
83 # (unpacked announcement dictionary, verifyingkey, rxtime). The ann
84 # dicts can be compared for equality to distinguish re-announcement
85 # from updates. It also provides memory for clients who subscribe
87 self._inbound_announcements = {}
89 # hooks for unit tests
90 self._debug_counts = {
92 "inbound_announcement": 0,
94 "duplicate_announcement": 0,
96 "new_announcement": 0,
97 "outbound_message": 0,
99 self._debug_outstanding = 0
101 def _debug_retired(self, res):
102 self._debug_outstanding -= 1
105 def startService(self):
106 service.Service.startService(self)
107 self._introducer_error = None
108 rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
109 self._introducer_reconnector = rc
110 def connect_failed(failure):
111 self.log("Initial Introducer connection failed: perhaps it's down",
112 level=log.WEIRD, failure=failure, umid="c5MqUQ")
113 d = self._tub.getReference(self.introducer_furl)
114 d.addErrback(connect_failed)
116 def _got_introducer(self, publisher):
117 self.log("connected to introducer, getting versions")
118 default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
120 "application-version": "unknown: no get_version()",
122 d = add_version_to_remote_reference(publisher, default)
123 d.addCallback(self._got_versioned_introducer)
124 d.addErrback(self._got_error)
126 def _got_error(self, f):
127 # TODO: for the introducer, perhaps this should halt the application
128 self._introducer_error = f # polled by tests
130 def _got_versioned_introducer(self, publisher):
131 self.log("got introducer version: %s" % (publisher.version,))
132 # we require an introducer that speaks at least one of (V1, V2)
133 if not (V1 in publisher.version or V2 in publisher.version):
134 raise InsufficientVersionError("V1 or V2", publisher.version)
135 self._publisher = publisher
136 publisher.notifyOnDisconnect(self._disconnected)
137 self._maybe_publish()
138 self._maybe_subscribe()
140 def _disconnected(self):
141 self.log("bummer, we've lost our connection to the introducer")
142 self._publisher = None
143 self._subscriptions.clear()
145 def log(self, *args, **kwargs):
146 if "facility" not in kwargs:
147 kwargs["facility"] = "tahoe.introducer.client"
148 return log.msg(*args, **kwargs)
150 def subscribe_to(self, service_name, cb, *args, **kwargs):
151 self._local_subscribers.append( (service_name,cb,args,kwargs) )
152 self._subscribed_service_names.add(service_name)
153 self._maybe_subscribe()
154 for index,(ann,key_s,when) in self._inbound_announcements.items():
155 servicename = index[0]
156 if servicename == service_name:
157 eventually(cb, key_s, ann, *args, **kwargs)
159 def _maybe_subscribe(self):
160 if not self._publisher:
161 self.log("want to subscribe, but no introducer yet",
164 for service_name in self._subscribed_service_names:
165 if service_name in self._subscriptions:
167 self._subscriptions.add(service_name)
168 if V2 in self._publisher.version:
169 self._debug_outstanding += 1
170 d = self._publisher.callRemote("subscribe_v2",
172 self._my_subscriber_info)
173 d.addBoth(self._debug_retired)
175 d = self._subscribe_handle_v1(service_name) # for_v1
176 d.addErrback(log.err, facility="tahoe.introducer.client",
177 level=log.WEIRD, umid="2uMScQ")
179 def _subscribe_handle_v1(self, service_name): # for_v1
180 # they don't speak V2: must be a v1 introducer. Fall back to the v1
181 # 'subscribe' method, using a client adapter.
182 ca = WrapV2ClientInV1Interface(self)
183 self._debug_outstanding += 1
184 d = self._publisher.callRemote("subscribe", ca, service_name)
185 d.addBoth(self._debug_retired)
186 # We must also publish an empty 'stub_client' object, so the
187 # introducer can count how many clients are connected and see what
188 # versions they're running.
189 if not self._stub_client_furl:
190 self._stub_client = sc = StubClient()
191 self._stub_client_furl = self._tub.registerReference(sc)
192 def _publish_stub_client(ignored):
193 furl = self._stub_client_furl
194 self.publish("stub_client",
195 { "anonymous-storage-FURL": furl,
196 "permutation-seed-base32": get_tubid_string(furl),
198 d.addCallback(_publish_stub_client)
201 def create_announcement_dict(self, service_name, ann):
202 ann_d = { "version": 0,
203 # "seqnum" and "nonce" will be populated with new values in
204 # publish(), each time we make a change
205 "nickname": self._nickname,
206 "app-versions": self._app_versions,
207 "my-version": self._my_version,
208 "oldest-supported": self._oldest_supported,
210 "service-name": service_name,
215 def publish(self, service_name, ann, signing_key=None):
216 # we increment the seqnum every time we publish something new
217 current_seqnum, current_nonce = self._sequencer()
219 ann_d = self.create_announcement_dict(service_name, ann)
220 self._outbound_announcements[service_name] = ann_d
222 # publish all announcements with the new seqnum and nonce
223 for service_name,ann_d in self._outbound_announcements.items():
224 ann_d["seqnum"] = current_seqnum
225 ann_d["nonce"] = current_nonce
226 ann_t = sign_to_foolscap(ann_d, signing_key)
227 self._published_announcements[service_name] = ann_t
228 self._maybe_publish()
230 def _maybe_publish(self):
231 if not self._publisher:
232 self.log("want to publish, but no introducer yet", level=log.NOISY)
234 # this re-publishes everything. The Introducer ignores duplicates
235 for ann_t in self._published_announcements.values():
236 self._debug_counts["outbound_message"] += 1
237 if V2 in self._publisher.version:
238 self._debug_outstanding += 1
239 d = self._publisher.callRemote("publish_v2", ann_t,
241 d.addBoth(self._debug_retired)
243 d = self._handle_v1_publisher(ann_t) # for_v1
244 d.addErrback(log.err, ann_t=ann_t,
245 facility="tahoe.introducer.client",
246 level=log.WEIRD, umid="xs9pVQ")
248 def _handle_v1_publisher(self, ann_t): # for_v1
249 # they don't speak V2, so fall back to the old 'publish' method
250 # (which takes an unsigned tuple of bytestrings)
251 self.log("falling back to publish_v1",
252 level=log.UNUSUAL, umid="9RCT1A")
253 ann_v1 = convert_announcement_v2_to_v1(ann_t)
254 self._debug_outstanding += 1
255 d = self._publisher.callRemote("publish", ann_v1)
256 d.addBoth(self._debug_retired)
260 def remote_announce_v2(self, announcements):
261 lp = self.log("received %d announcements (v2)" % len(announcements))
262 return self.got_announcements(announcements, lp)
264 def got_announcements(self, announcements, lp=None):
265 # this is the common entry point for both v1 and v2 announcements
266 self._debug_counts["inbound_message"] += 1
267 for ann_t in announcements:
269 # this might raise UnknownKeyError or bad-sig error
270 ann, key_s = unsign_from_foolscap(ann_t)
271 # key is "v0-base32abc123"
272 except BadSignatureError:
273 self.log("bad signature on inbound announcement: %s" % (ann_t,),
274 parent=lp, level=log.WEIRD, umid="ZAU15Q")
275 # process other announcements that arrived with the bad one
278 self._process_announcement(ann, key_s)
280 def _process_announcement(self, ann, key_s):
281 self._debug_counts["inbound_announcement"] += 1
282 service_name = str(ann["service-name"])
283 if service_name not in self._subscribed_service_names:
284 self.log("announcement for a service we don't care about [%s]"
285 % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
286 self._debug_counts["wrong_service"] += 1
288 # for ASCII values, simplejson might give us unicode *or* bytes
289 if "nickname" in ann and isinstance(ann["nickname"], str):
290 ann["nickname"] = unicode(ann["nickname"])
291 nick_s = ann.get("nickname",u"").encode("utf-8")
292 lp2 = self.log(format="announcement for nickname '%(nick)s', service=%(svc)s: %(ann)s",
293 nick=nick_s, svc=service_name, ann=ann, umid="BoKEag")
295 # how do we describe this node in the logs?
298 desc_bits.append("serverid=" + key_s[:20])
299 if "anonymous-storage-FURL" in ann:
300 tubid_s = get_tubid_string_from_ann(ann)
301 desc_bits.append("tubid=" + tubid_s[:8])
302 description = "/".join(desc_bits)
304 # the index is used to track duplicates
305 index = make_index(ann, key_s)
307 # is this announcement a duplicate?
308 if (index in self._inbound_announcements
309 and self._inbound_announcements[index][0] == ann):
310 self.log(format="reannouncement for [%(service)s]:%(description)s, ignoring",
311 service=service_name, description=description,
312 parent=lp2, level=log.UNUSUAL, umid="B1MIdA")
313 self._debug_counts["duplicate_announcement"] += 1
316 # does it update an existing one?
317 if index in self._inbound_announcements:
318 old,_,_ = self._inbound_announcements[index]
320 # must beat previous sequence number to replace
321 if ("seqnum" not in ann
322 or not isinstance(ann["seqnum"], (int,long))):
323 self.log("not replacing old announcement, no valid seqnum: %s"
325 parent=lp2, level=log.NOISY, umid="zFGH3Q")
327 if ann["seqnum"] <= old["seqnum"]:
328 # note that exact replays are caught earlier, by
329 # comparing the entire signed announcement.
330 self.log("not replacing old announcement, "
331 "new seqnum is too old (%s <= %s) "
332 "(replay attack?): %s"
333 % (ann["seqnum"], old["seqnum"], ann),
334 parent=lp2, level=log.UNUSUAL, umid="JAAAoQ")
336 # ok, seqnum is newer, allow replacement
337 self._debug_counts["update"] += 1
338 self.log("replacing old announcement: %s" % (ann,),
339 parent=lp2, level=log.NOISY, umid="wxwgIQ")
341 self._debug_counts["new_announcement"] += 1
342 self.log("new announcement[%s]" % service_name,
343 parent=lp2, level=log.NOISY)
345 self._inbound_announcements[index] = (ann, key_s, time.time())
346 # note: we never forget an index, but we might update its value
348 for (service_name2,cb,args,kwargs) in self._local_subscribers:
349 if service_name2 == service_name:
350 eventually(cb, key_s, ann, *args, **kwargs)
352 def connected_to_introducer(self):
353 return bool(self._publisher)