]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/introducer/client.py
f463c1202c0f084f426939153cb92b45959b135e
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / introducer / client.py
1
2 import time
3 from zope.interface import implements
4 from twisted.application import service
5 from foolscap.api import Referenceable, eventually, RemoteInterface
6 from allmydata.interfaces import InsufficientVersionError
7 from allmydata.introducer.interfaces import IIntroducerClient, \
8      RIIntroducerSubscriberClient_v1, RIIntroducerSubscriberClient_v2
9 from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\
10      convert_announcement_v1_to_v2, convert_announcement_v2_to_v1, \
11      make_index, get_tubid_string_from_ann, get_tubid_string
12 from allmydata.util import log
13 from allmydata.util.rrefutil import add_version_to_remote_reference
14 from allmydata.util.keyutil import BadSignatureError
15
16 class WrapV2ClientInV1Interface(Referenceable): # for_v1
17     """I wrap a v2 IntroducerClient to make it look like a v1 client, so it
18     can be attached to an old server."""
19     implements(RIIntroducerSubscriberClient_v1)
20
21     def __init__(self, original):
22         self.original = original
23
24     def remote_announce(self, announcements):
25         lp = self.original.log("received %d announcements (v1)" %
26                                len(announcements))
27         anns_v1 = set([convert_announcement_v1_to_v2(ann_v1)
28                        for ann_v1 in announcements])
29         return self.original.got_announcements(anns_v1, lp)
30
31     def remote_set_encoding_parameters(self, parameters):
32         self.original.remote_set_encoding_parameters(parameters)
33
34 class RIStubClient(RemoteInterface): # for_v1
35     """Each client publishes a service announcement for a dummy object called
36     the StubClient. This object doesn't actually offer any services, but the
37     announcement helps the Introducer keep track of which clients are
38     subscribed (so the grid admin can keep track of things like the size of
39     the grid and the client versions in use. This is the (empty)
40     RemoteInterface for the StubClient."""
41
42 class StubClient(Referenceable): # for_v1
43     implements(RIStubClient)
44
45 V1 = "http://allmydata.org/tahoe/protocols/introducer/v1"
46 V2 = "http://allmydata.org/tahoe/protocols/introducer/v2"
47
48 class IntroducerClient(service.Service, Referenceable):
49     implements(RIIntroducerSubscriberClient_v2, IIntroducerClient)
50
51     def __init__(self, tub, introducer_furl,
52                  nickname, my_version, oldest_supported,
53                  app_versions, sequencer):
54         self._tub = tub
55         self.introducer_furl = introducer_furl
56
57         assert type(nickname) is unicode
58         self._nickname = nickname
59         self._my_version = my_version
60         self._oldest_supported = oldest_supported
61         self._app_versions = app_versions
62         self._sequencer = sequencer
63
64         self._my_subscriber_info = { "version": 0,
65                                      "nickname": self._nickname,
66                                      "app-versions": self._app_versions,
67                                      "my-version": self._my_version,
68                                      "oldest-supported": self._oldest_supported,
69                                      }
70         self._stub_client = None # for_v1
71         self._stub_client_furl = None
72
73         self._outbound_announcements = {} # not signed
74         self._published_announcements = {} # signed
75         self._canary = Referenceable()
76
77         self._publisher = None
78
79         self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
80         self._subscribed_service_names = set()
81         self._subscriptions = set() # requests we've actually sent
82
83         # _inbound_announcements remembers one announcement per
84         # (servicename,serverid) pair. Anything that arrives with the same
85         # pair will displace the previous one. This stores tuples of
86         # (unpacked announcement dictionary, verifyingkey, rxtime). The ann
87         # dicts can be compared for equality to distinguish re-announcement
88         # from updates. It also provides memory for clients who subscribe
89         # after startup.
90         self._inbound_announcements = {}
91
92         self.encoding_parameters = None
93
94         # hooks for unit tests
95         self._debug_counts = {
96             "inbound_message": 0,
97             "inbound_announcement": 0,
98             "wrong_service": 0,
99             "duplicate_announcement": 0,
100             "update": 0,
101             "new_announcement": 0,
102             "outbound_message": 0,
103             }
104         self._debug_outstanding = 0
105
106     def _debug_retired(self, res):
107         self._debug_outstanding -= 1
108         return res
109
110     def startService(self):
111         service.Service.startService(self)
112         self._introducer_error = None
113         rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
114         self._introducer_reconnector = rc
115         def connect_failed(failure):
116             self.log("Initial Introducer connection failed: perhaps it's down",
117                      level=log.WEIRD, failure=failure, umid="c5MqUQ")
118         d = self._tub.getReference(self.introducer_furl)
119         d.addErrback(connect_failed)
120
121     def _got_introducer(self, publisher):
122         self.log("connected to introducer, getting versions")
123         default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
124                     { },
125                     "application-version": "unknown: no get_version()",
126                     }
127         d = add_version_to_remote_reference(publisher, default)
128         d.addCallback(self._got_versioned_introducer)
129         d.addErrback(self._got_error)
130
131     def _got_error(self, f):
132         # TODO: for the introducer, perhaps this should halt the application
133         self._introducer_error = f # polled by tests
134
135     def _got_versioned_introducer(self, publisher):
136         self.log("got introducer version: %s" % (publisher.version,))
137         # we require an introducer that speaks at least one of (V1, V2)
138         if not (V1 in publisher.version or V2 in publisher.version):
139             raise InsufficientVersionError("V1 or V2", publisher.version)
140         self._publisher = publisher
141         publisher.notifyOnDisconnect(self._disconnected)
142         self._maybe_publish()
143         self._maybe_subscribe()
144
145     def _disconnected(self):
146         self.log("bummer, we've lost our connection to the introducer")
147         self._publisher = None
148         self._subscriptions.clear()
149
150     def log(self, *args, **kwargs):
151         if "facility" not in kwargs:
152             kwargs["facility"] = "tahoe.introducer.client"
153         return log.msg(*args, **kwargs)
154
155     def subscribe_to(self, service_name, cb, *args, **kwargs):
156         self._local_subscribers.append( (service_name,cb,args,kwargs) )
157         self._subscribed_service_names.add(service_name)
158         self._maybe_subscribe()
159         for index,(ann,key_s,when) in self._inbound_announcements.items():
160             servicename = index[0]
161             if servicename == service_name:
162                 eventually(cb, key_s, ann, *args, **kwargs)
163
164     def _maybe_subscribe(self):
165         if not self._publisher:
166             self.log("want to subscribe, but no introducer yet",
167                      level=log.NOISY)
168             return
169         for service_name in self._subscribed_service_names:
170             if service_name in self._subscriptions:
171                 continue
172             self._subscriptions.add(service_name)
173             if V2 in self._publisher.version:
174                 self._debug_outstanding += 1
175                 d = self._publisher.callRemote("subscribe_v2",
176                                                self, service_name,
177                                                self._my_subscriber_info)
178                 d.addBoth(self._debug_retired)
179             else:
180                 d = self._subscribe_handle_v1(service_name) # for_v1
181             d.addErrback(log.err, facility="tahoe.introducer.client",
182                          level=log.WEIRD, umid="2uMScQ")
183
184     def _subscribe_handle_v1(self, service_name): # for_v1
185         # they don't speak V2: must be a v1 introducer. Fall back to the v1
186         # 'subscribe' method, using a client adapter.
187         ca = WrapV2ClientInV1Interface(self)
188         self._debug_outstanding += 1
189         d = self._publisher.callRemote("subscribe", ca, service_name)
190         d.addBoth(self._debug_retired)
191         # We must also publish an empty 'stub_client' object, so the
192         # introducer can count how many clients are connected and see what
193         # versions they're running.
194         if not self._stub_client_furl:
195             self._stub_client = sc = StubClient()
196             self._stub_client_furl = self._tub.registerReference(sc)
197         def _publish_stub_client(ignored):
198             furl = self._stub_client_furl
199             self.publish("stub_client",
200                          { "anonymous-storage-FURL": furl,
201                            "permutation-seed-base32": get_tubid_string(furl),
202                            })
203         d.addCallback(_publish_stub_client)
204         return d
205
206     def create_announcement_dict(self, service_name, ann):
207         ann_d = { "version": 0,
208                   # "seqnum" and "nonce" will be populated with new values in
209                   # publish(), each time we make a change
210                   "nickname": self._nickname,
211                   "app-versions": self._app_versions,
212                   "my-version": self._my_version,
213                   "oldest-supported": self._oldest_supported,
214
215                   "service-name": service_name,
216                   }
217         ann_d.update(ann)
218         return ann_d
219
220     def publish(self, service_name, ann, signing_key=None):
221         # we increment the seqnum every time we publish something new
222         current_seqnum, current_nonce = self._sequencer()
223
224         ann_d = self.create_announcement_dict(service_name, ann)
225         self._outbound_announcements[service_name] = ann_d
226
227         # publish all announcements with the new seqnum and nonce
228         for service_name,ann_d in self._outbound_announcements.items():
229             ann_d["seqnum"] = current_seqnum
230             ann_d["nonce"] = current_nonce
231             ann_t = sign_to_foolscap(ann_d, signing_key)
232             self._published_announcements[service_name] = ann_t
233         self._maybe_publish()
234
235     def _maybe_publish(self):
236         if not self._publisher:
237             self.log("want to publish, but no introducer yet", level=log.NOISY)
238             return
239         # this re-publishes everything. The Introducer ignores duplicates
240         for ann_t in self._published_announcements.values():
241             self._debug_counts["outbound_message"] += 1
242             if V2 in self._publisher.version:
243                 self._debug_outstanding += 1
244                 d = self._publisher.callRemote("publish_v2", ann_t,
245                                                self._canary)
246                 d.addBoth(self._debug_retired)
247             else:
248                 d = self._handle_v1_publisher(ann_t) # for_v1
249             d.addErrback(log.err, ann_t=ann_t,
250                          facility="tahoe.introducer.client",
251                          level=log.WEIRD, umid="xs9pVQ")
252
253     def _handle_v1_publisher(self, ann_t): # for_v1
254         # they don't speak V2, so fall back to the old 'publish' method
255         # (which takes an unsigned tuple of bytestrings)
256         self.log("falling back to publish_v1",
257                  level=log.UNUSUAL, umid="9RCT1A")
258         ann_v1 = convert_announcement_v2_to_v1(ann_t)
259         self._debug_outstanding += 1
260         d = self._publisher.callRemote("publish", ann_v1)
261         d.addBoth(self._debug_retired)
262         return d
263
264
265     def remote_announce_v2(self, announcements):
266         lp = self.log("received %d announcements (v2)" % len(announcements))
267         return self.got_announcements(announcements, lp)
268
269     def got_announcements(self, announcements, lp=None):
270         # this is the common entry point for both v1 and v2 announcements
271         self._debug_counts["inbound_message"] += 1
272         for ann_t in announcements:
273             try:
274                 # this might raise UnknownKeyError or bad-sig error
275                 ann, key_s = unsign_from_foolscap(ann_t)
276                 # key is "v0-base32abc123"
277             except BadSignatureError:
278                 self.log("bad signature on inbound announcement: %s" % (ann_t,),
279                          parent=lp, level=log.WEIRD, umid="ZAU15Q")
280                 # process other announcements that arrived with the bad one
281                 continue
282
283             self._process_announcement(ann, key_s)
284
285     def _process_announcement(self, ann, key_s):
286         self._debug_counts["inbound_announcement"] += 1
287         service_name = str(ann["service-name"])
288         if service_name not in self._subscribed_service_names:
289             self.log("announcement for a service we don't care about [%s]"
290                      % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
291             self._debug_counts["wrong_service"] += 1
292             return
293         # for ASCII values, simplejson might give us unicode *or* bytes
294         if "nickname" in ann and isinstance(ann["nickname"], str):
295             ann["nickname"] = unicode(ann["nickname"])
296         nick_s = ann.get("nickname",u"").encode("utf-8")
297         lp2 = self.log(format="announcement for nickname '%(nick)s', service=%(svc)s: %(ann)s",
298                        nick=nick_s, svc=service_name, ann=ann, umid="BoKEag")
299
300         # how do we describe this node in the logs?
301         desc_bits = []
302         if key_s:
303             desc_bits.append("serverid=" + key_s[:20])
304         if "anonymous-storage-FURL" in ann:
305             tubid_s = get_tubid_string_from_ann(ann)
306             desc_bits.append("tubid=" + tubid_s[:8])
307         description = "/".join(desc_bits)
308
309         # the index is used to track duplicates
310         index = make_index(ann, key_s)
311
312         # is this announcement a duplicate?
313         if (index in self._inbound_announcements
314             and self._inbound_announcements[index][0] == ann):
315             self.log(format="reannouncement for [%(service)s]:%(description)s, ignoring",
316                      service=service_name, description=description,
317                      parent=lp2, level=log.UNUSUAL, umid="B1MIdA")
318             self._debug_counts["duplicate_announcement"] += 1
319             return
320
321         # does it update an existing one?
322         if index in self._inbound_announcements:
323             old,_,_ = self._inbound_announcements[index]
324             if "seqnum" in old:
325                 # must beat previous sequence number to replace
326                 if ("seqnum" not in ann
327                     or not isinstance(ann["seqnum"], (int,long))):
328                     self.log("not replacing old announcement, no valid seqnum: %s"
329                              % (ann,),
330                              parent=lp2, level=log.NOISY, umid="zFGH3Q")
331                     return
332                 if ann["seqnum"] <= old["seqnum"]:
333                     # note that exact replays are caught earlier, by
334                     # comparing the entire signed announcement.
335                     self.log("not replacing old announcement, "
336                              "new seqnum is too old (%s <= %s) "
337                              "(replay attack?): %s"
338                              % (ann["seqnum"], old["seqnum"], ann),
339                              parent=lp2, level=log.UNUSUAL, umid="JAAAoQ")
340                     return
341                 # ok, seqnum is newer, allow replacement
342             self._debug_counts["update"] += 1
343             self.log("replacing old announcement: %s" % (ann,),
344                      parent=lp2, level=log.NOISY, umid="wxwgIQ")
345         else:
346             self._debug_counts["new_announcement"] += 1
347             self.log("new announcement[%s]" % service_name,
348                      parent=lp2, level=log.NOISY)
349
350         self._inbound_announcements[index] = (ann, key_s, time.time())
351         # note: we never forget an index, but we might update its value
352
353         for (service_name2,cb,args,kwargs) in self._local_subscribers:
354             if service_name2 == service_name:
355                 eventually(cb, key_s, ann, *args, **kwargs)
356
357     def remote_set_encoding_parameters(self, parameters):
358         self.encoding_parameters = parameters
359
360     def connected_to_introducer(self):
361         return bool(self._publisher)