]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/introducer/client.py
new introducer: signed extensible dictionary-based messages! refs #466
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / introducer / client.py
1
2 import time
3 from zope.interface import implements
4 from twisted.application import service
5 from foolscap.api import Referenceable, eventually, RemoteInterface
6 from allmydata.interfaces import InsufficientVersionError
7 from allmydata.introducer.interfaces import IIntroducerClient, \
8      RIIntroducerSubscriberClient_v1, RIIntroducerSubscriberClient_v2
9 from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\
10      convert_announcement_v1_to_v2, convert_announcement_v2_to_v1, \
11      make_index, get_tubid_string_from_ann, get_tubid_string
12 from allmydata.util import log
13 from allmydata.util.rrefutil import add_version_to_remote_reference
14 from allmydata.util.keyutil import BadSignatureError
15
16 class WrapV2ClientInV1Interface(Referenceable): # for_v1
17     """I wrap a v2 IntroducerClient to make it look like a v1 client, so it
18     can be attached to an old server."""
19     implements(RIIntroducerSubscriberClient_v1)
20
21     def __init__(self, original):
22         self.original = original
23
24     def remote_announce(self, announcements):
25         lp = self.original.log("received %d announcements (v1)" %
26                                len(announcements))
27         anns_v1 = set([convert_announcement_v1_to_v2(ann_v1)
28                        for ann_v1 in announcements])
29         return self.original.got_announcements(anns_v1, lp)
30
31     def remote_set_encoding_parameters(self, parameters):
32         self.original.remote_set_encoding_parameters(parameters)
33
34 class RIStubClient(RemoteInterface): # for_v1
35     """Each client publishes a service announcement for a dummy object called
36     the StubClient. This object doesn't actually offer any services, but the
37     announcement helps the Introducer keep track of which clients are
38     subscribed (so the grid admin can keep track of things like the size of
39     the grid and the client versions in use. This is the (empty)
40     RemoteInterface for the StubClient."""
41
42 class StubClient(Referenceable): # for_v1
43     implements(RIStubClient)
44
45 V1 = "http://allmydata.org/tahoe/protocols/introducer/v1"
46 V2 = "http://allmydata.org/tahoe/protocols/introducer/v2"
47
48 class IntroducerClient(service.Service, Referenceable):
49     implements(RIIntroducerSubscriberClient_v2, IIntroducerClient)
50
51     def __init__(self, tub, introducer_furl,
52                  nickname, my_version, oldest_supported,
53                  app_versions):
54         self._tub = tub
55         self.introducer_furl = introducer_furl
56
57         assert type(nickname) is unicode
58         self._nickname = nickname
59         self._my_version = my_version
60         self._oldest_supported = oldest_supported
61         self._app_versions = app_versions
62
63         self._my_subscriber_info = { "version": 0,
64                                      "nickname": self._nickname,
65                                      "app-versions": self._app_versions,
66                                      "my-version": self._my_version,
67                                      "oldest-supported": self._oldest_supported,
68                                      }
69         self._stub_client = None # for_v1
70         self._stub_client_furl = None
71
72         self._published_announcements = {}
73         self._canary = Referenceable()
74
75         self._publisher = None
76
77         self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
78         self._subscribed_service_names = set()
79         self._subscriptions = set() # requests we've actually sent
80
81         # _current_announcements remembers one announcement per
82         # (servicename,serverid) pair. Anything that arrives with the same
83         # pair will displace the previous one. This stores tuples of
84         # (unpacked announcement dictionary, verifyingkey, rxtime). The ann
85         # dicts can be compared for equality to distinguish re-announcement
86         # from updates. It also provides memory for clients who subscribe
87         # after startup.
88         self._current_announcements = {}
89
90         self.encoding_parameters = None
91
92         # hooks for unit tests
93         self._debug_counts = {
94             "inbound_message": 0,
95             "inbound_announcement": 0,
96             "wrong_service": 0,
97             "duplicate_announcement": 0,
98             "update": 0,
99             "new_announcement": 0,
100             "outbound_message": 0,
101             }
102         self._debug_outstanding = 0
103
104     def _debug_retired(self, res):
105         self._debug_outstanding -= 1
106         return res
107
108     def startService(self):
109         service.Service.startService(self)
110         self._introducer_error = None
111         rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
112         self._introducer_reconnector = rc
113         def connect_failed(failure):
114             self.log("Initial Introducer connection failed: perhaps it's down",
115                      level=log.WEIRD, failure=failure, umid="c5MqUQ")
116         d = self._tub.getReference(self.introducer_furl)
117         d.addErrback(connect_failed)
118
119     def _got_introducer(self, publisher):
120         self.log("connected to introducer, getting versions")
121         default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
122                     { },
123                     "application-version": "unknown: no get_version()",
124                     }
125         d = add_version_to_remote_reference(publisher, default)
126         d.addCallback(self._got_versioned_introducer)
127         d.addErrback(self._got_error)
128
129     def _got_error(self, f):
130         # TODO: for the introducer, perhaps this should halt the application
131         self._introducer_error = f # polled by tests
132
133     def _got_versioned_introducer(self, publisher):
134         self.log("got introducer version: %s" % (publisher.version,))
135         # we require an introducer that speaks at least one of (V1, V2)
136         if not (V1 in publisher.version or V2 in publisher.version):
137             raise InsufficientVersionError("V1 or V2", publisher.version)
138         self._publisher = publisher
139         publisher.notifyOnDisconnect(self._disconnected)
140         self._maybe_publish()
141         self._maybe_subscribe()
142
143     def _disconnected(self):
144         self.log("bummer, we've lost our connection to the introducer")
145         self._publisher = None
146         self._subscriptions.clear()
147
148     def log(self, *args, **kwargs):
149         if "facility" not in kwargs:
150             kwargs["facility"] = "tahoe.introducer.client"
151         return log.msg(*args, **kwargs)
152
153     def subscribe_to(self, service_name, cb, *args, **kwargs):
154         self._local_subscribers.append( (service_name,cb,args,kwargs) )
155         self._subscribed_service_names.add(service_name)
156         self._maybe_subscribe()
157         for index,(ann,key_s,when) in self._current_announcements.items():
158             servicename = index[0]
159             if servicename == service_name:
160                 eventually(cb, key_s, ann, *args, **kwargs)
161
162     def _maybe_subscribe(self):
163         if not self._publisher:
164             self.log("want to subscribe, but no introducer yet",
165                      level=log.NOISY)
166             return
167         for service_name in self._subscribed_service_names:
168             if service_name in self._subscriptions:
169                 continue
170             self._subscriptions.add(service_name)
171             if V2 in self._publisher.version:
172                 self._debug_outstanding += 1
173                 d = self._publisher.callRemote("subscribe_v2",
174                                                self, service_name,
175                                                self._my_subscriber_info)
176                 d.addBoth(self._debug_retired)
177             else:
178                 d = self._subscribe_handle_v1(service_name) # for_v1
179             d.addErrback(log.err, facility="tahoe.introducer.client",
180                          level=log.WEIRD, umid="2uMScQ")
181
182     def _subscribe_handle_v1(self, service_name): # for_v1
183         # they don't speak V2: must be a v1 introducer. Fall back to the v1
184         # 'subscribe' method, using a client adapter.
185         ca = WrapV2ClientInV1Interface(self)
186         self._debug_outstanding += 1
187         d = self._publisher.callRemote("subscribe", ca, service_name)
188         d.addBoth(self._debug_retired)
189         # We must also publish an empty 'stub_client' object, so the
190         # introducer can count how many clients are connected and see what
191         # versions they're running.
192         if not self._stub_client_furl:
193             self._stub_client = sc = StubClient()
194             self._stub_client_furl = self._tub.registerReference(sc)
195         def _publish_stub_client(ignored):
196             furl = self._stub_client_furl
197             self.publish("stub_client",
198                          { "anonymous-storage-FURL": furl,
199                            "permutation-seed-base32": get_tubid_string(furl),
200                            })
201         d.addCallback(_publish_stub_client)
202         return d
203
204     def create_announcement(self, service_name, ann, signing_key):
205         full_ann = { "version": 0,
206                      "nickname": self._nickname,
207                      "app-versions": self._app_versions,
208                      "my-version": self._my_version,
209                      "oldest-supported": self._oldest_supported,
210
211                      "service-name": service_name,
212                      }
213         full_ann.update(ann)
214         return sign_to_foolscap(full_ann, signing_key)
215
216     def publish(self, service_name, ann, signing_key=None):
217         ann_t = self.create_announcement(service_name, ann, signing_key)
218         self._published_announcements[service_name] = ann_t
219         self._maybe_publish()
220
221     def _maybe_publish(self):
222         if not self._publisher:
223             self.log("want to publish, but no introducer yet", level=log.NOISY)
224             return
225         # this re-publishes everything. The Introducer ignores duplicates
226         for ann_t in self._published_announcements.values():
227             self._debug_counts["outbound_message"] += 1
228             if V2 in self._publisher.version:
229                 self._debug_outstanding += 1
230                 d = self._publisher.callRemote("publish_v2", ann_t,
231                                                self._canary)
232                 d.addBoth(self._debug_retired)
233             else:
234                 d = self._handle_v1_publisher(ann_t) # for_v1
235             d.addErrback(log.err, ann_t=ann_t,
236                          facility="tahoe.introducer.client",
237                          level=log.WEIRD, umid="xs9pVQ")
238
239     def _handle_v1_publisher(self, ann_t): # for_v1
240         # they don't speak V2, so fall back to the old 'publish' method
241         # (which takes an unsigned tuple of bytestrings)
242         self.log("falling back to publish_v1",
243                  level=log.UNUSUAL, umid="9RCT1A")
244         ann_v1 = convert_announcement_v2_to_v1(ann_t)
245         self._debug_outstanding += 1
246         d = self._publisher.callRemote("publish", ann_v1)
247         d.addBoth(self._debug_retired)
248         return d
249
250
251     def remote_announce_v2(self, announcements):
252         lp = self.log("received %d announcements (v2)" % len(announcements))
253         return self.got_announcements(announcements, lp)
254
255     def got_announcements(self, announcements, lp=None):
256         # this is the common entry point for both v1 and v2 announcements
257         self._debug_counts["inbound_message"] += 1
258         for ann_t in announcements:
259             try:
260                 # this might raise UnknownKeyError or bad-sig error
261                 ann, key_s = unsign_from_foolscap(ann_t)
262                 # key is "v0-base32abc123"
263             except BadSignatureError:
264                 self.log("bad signature on inbound announcement: %s" % (ann_t,),
265                          parent=lp, level=log.WEIRD, umid="ZAU15Q")
266                 # process other announcements that arrived with the bad one
267                 continue
268
269             self._process_announcement(ann, key_s)
270
271     def _process_announcement(self, ann, key_s):
272         self._debug_counts["inbound_announcement"] += 1
273         service_name = str(ann["service-name"])
274         if service_name not in self._subscribed_service_names:
275             self.log("announcement for a service we don't care about [%s]"
276                      % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
277             self._debug_counts["wrong_service"] += 1
278             return
279         # for ASCII values, simplejson might give us unicode *or* bytes
280         if "nickname" in ann and isinstance(ann["nickname"], str):
281             ann["nickname"] = unicode(ann["nickname"])
282         nick_s = ann.get("nickname",u"").encode("utf-8")
283         lp2 = self.log(format="announcement for nickname '%(nick)s', service=%(svc)s: %(ann)s",
284                        nick=nick_s, svc=service_name, ann=ann, umid="BoKEag")
285
286         # how do we describe this node in the logs?
287         desc_bits = []
288         if key_s:
289             desc_bits.append("serverid=" + key_s[:20])
290         if "anonymous-storage-FURL" in ann:
291             tubid_s = get_tubid_string_from_ann(ann)
292             desc_bits.append("tubid=" + tubid_s[:8])
293         description = "/".join(desc_bits)
294
295         # the index is used to track duplicates
296         index = make_index(ann, key_s)
297
298         # is this announcement a duplicate?
299         if (index in self._current_announcements
300             and self._current_announcements[index][0] == ann):
301             self.log(format="reannouncement for [%(service)s]:%(description)s, ignoring",
302                      service=service_name, description=description,
303                      parent=lp2, level=log.UNUSUAL, umid="B1MIdA")
304             self._debug_counts["duplicate_announcement"] += 1
305             return
306         # does it update an existing one?
307         if index in self._current_announcements:
308             self._debug_counts["update"] += 1
309             self.log("replacing old announcement: %s" % (ann,),
310                      parent=lp2, level=log.NOISY, umid="wxwgIQ")
311         else:
312             self._debug_counts["new_announcement"] += 1
313             self.log("new announcement[%s]" % service_name,
314                      parent=lp2, level=log.NOISY)
315
316         self._current_announcements[index] = (ann, key_s, time.time())
317         # note: we never forget an index, but we might update its value
318
319         for (service_name2,cb,args,kwargs) in self._local_subscribers:
320             if service_name2 == service_name:
321                 eventually(cb, key_s, ann, *args, **kwargs)
322
323     def remote_set_encoding_parameters(self, parameters):
324         self.encoding_parameters = parameters
325
326     def connected_to_introducer(self):
327         return bool(self._publisher)