]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/introducer/client.py
remove introducer's set_encoding_parameters
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / introducer / client.py
1
2 import time
3 from zope.interface import implements
4 from twisted.application import service
5 from foolscap.api import Referenceable, eventually, RemoteInterface
6 from allmydata.interfaces import InsufficientVersionError
7 from allmydata.introducer.interfaces import IIntroducerClient, \
8      RIIntroducerSubscriberClient_v1, RIIntroducerSubscriberClient_v2
9 from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\
10      convert_announcement_v1_to_v2, convert_announcement_v2_to_v1, \
11      make_index, get_tubid_string_from_ann, get_tubid_string
12 from allmydata.util import log
13 from allmydata.util.rrefutil import add_version_to_remote_reference
14 from allmydata.util.keyutil import BadSignatureError
15
16 class WrapV2ClientInV1Interface(Referenceable): # for_v1
17     """I wrap a v2 IntroducerClient to make it look like a v1 client, so it
18     can be attached to an old server."""
19     implements(RIIntroducerSubscriberClient_v1)
20
21     def __init__(self, original):
22         self.original = original
23
24     def remote_announce(self, announcements):
25         lp = self.original.log("received %d announcements (v1)" %
26                                len(announcements))
27         anns_v1 = set([convert_announcement_v1_to_v2(ann_v1)
28                        for ann_v1 in announcements])
29         return self.original.got_announcements(anns_v1, lp)
30
31 class RIStubClient(RemoteInterface): # for_v1
32     """Each client publishes a service announcement for a dummy object called
33     the StubClient. This object doesn't actually offer any services, but the
34     announcement helps the Introducer keep track of which clients are
35     subscribed (so the grid admin can keep track of things like the size of
36     the grid and the client versions in use. This is the (empty)
37     RemoteInterface for the StubClient."""
38
39 class StubClient(Referenceable): # for_v1
40     implements(RIStubClient)
41
42 V1 = "http://allmydata.org/tahoe/protocols/introducer/v1"
43 V2 = "http://allmydata.org/tahoe/protocols/introducer/v2"
44
45 class IntroducerClient(service.Service, Referenceable):
46     implements(RIIntroducerSubscriberClient_v2, IIntroducerClient)
47
48     def __init__(self, tub, introducer_furl,
49                  nickname, my_version, oldest_supported,
50                  app_versions, sequencer):
51         self._tub = tub
52         self.introducer_furl = introducer_furl
53
54         assert type(nickname) is unicode
55         self._nickname = nickname
56         self._my_version = my_version
57         self._oldest_supported = oldest_supported
58         self._app_versions = app_versions
59         self._sequencer = sequencer
60
61         self._my_subscriber_info = { "version": 0,
62                                      "nickname": self._nickname,
63                                      "app-versions": self._app_versions,
64                                      "my-version": self._my_version,
65                                      "oldest-supported": self._oldest_supported,
66                                      }
67         self._stub_client = None # for_v1
68         self._stub_client_furl = None
69
70         self._outbound_announcements = {} # not signed
71         self._published_announcements = {} # signed
72         self._canary = Referenceable()
73
74         self._publisher = None
75
76         self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
77         self._subscribed_service_names = set()
78         self._subscriptions = set() # requests we've actually sent
79
80         # _inbound_announcements remembers one announcement per
81         # (servicename,serverid) pair. Anything that arrives with the same
82         # pair will displace the previous one. This stores tuples of
83         # (unpacked announcement dictionary, verifyingkey, rxtime). The ann
84         # dicts can be compared for equality to distinguish re-announcement
85         # from updates. It also provides memory for clients who subscribe
86         # after startup.
87         self._inbound_announcements = {}
88
89         # hooks for unit tests
90         self._debug_counts = {
91             "inbound_message": 0,
92             "inbound_announcement": 0,
93             "wrong_service": 0,
94             "duplicate_announcement": 0,
95             "update": 0,
96             "new_announcement": 0,
97             "outbound_message": 0,
98             }
99         self._debug_outstanding = 0
100
101     def _debug_retired(self, res):
102         self._debug_outstanding -= 1
103         return res
104
105     def startService(self):
106         service.Service.startService(self)
107         self._introducer_error = None
108         rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
109         self._introducer_reconnector = rc
110         def connect_failed(failure):
111             self.log("Initial Introducer connection failed: perhaps it's down",
112                      level=log.WEIRD, failure=failure, umid="c5MqUQ")
113         d = self._tub.getReference(self.introducer_furl)
114         d.addErrback(connect_failed)
115
116     def _got_introducer(self, publisher):
117         self.log("connected to introducer, getting versions")
118         default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
119                     { },
120                     "application-version": "unknown: no get_version()",
121                     }
122         d = add_version_to_remote_reference(publisher, default)
123         d.addCallback(self._got_versioned_introducer)
124         d.addErrback(self._got_error)
125
126     def _got_error(self, f):
127         # TODO: for the introducer, perhaps this should halt the application
128         self._introducer_error = f # polled by tests
129
130     def _got_versioned_introducer(self, publisher):
131         self.log("got introducer version: %s" % (publisher.version,))
132         # we require an introducer that speaks at least one of (V1, V2)
133         if not (V1 in publisher.version or V2 in publisher.version):
134             raise InsufficientVersionError("V1 or V2", publisher.version)
135         self._publisher = publisher
136         publisher.notifyOnDisconnect(self._disconnected)
137         self._maybe_publish()
138         self._maybe_subscribe()
139
140     def _disconnected(self):
141         self.log("bummer, we've lost our connection to the introducer")
142         self._publisher = None
143         self._subscriptions.clear()
144
145     def log(self, *args, **kwargs):
146         if "facility" not in kwargs:
147             kwargs["facility"] = "tahoe.introducer.client"
148         return log.msg(*args, **kwargs)
149
150     def subscribe_to(self, service_name, cb, *args, **kwargs):
151         self._local_subscribers.append( (service_name,cb,args,kwargs) )
152         self._subscribed_service_names.add(service_name)
153         self._maybe_subscribe()
154         for index,(ann,key_s,when) in self._inbound_announcements.items():
155             servicename = index[0]
156             if servicename == service_name:
157                 eventually(cb, key_s, ann, *args, **kwargs)
158
159     def _maybe_subscribe(self):
160         if not self._publisher:
161             self.log("want to subscribe, but no introducer yet",
162                      level=log.NOISY)
163             return
164         for service_name in self._subscribed_service_names:
165             if service_name in self._subscriptions:
166                 continue
167             self._subscriptions.add(service_name)
168             if V2 in self._publisher.version:
169                 self._debug_outstanding += 1
170                 d = self._publisher.callRemote("subscribe_v2",
171                                                self, service_name,
172                                                self._my_subscriber_info)
173                 d.addBoth(self._debug_retired)
174             else:
175                 d = self._subscribe_handle_v1(service_name) # for_v1
176             d.addErrback(log.err, facility="tahoe.introducer.client",
177                          level=log.WEIRD, umid="2uMScQ")
178
179     def _subscribe_handle_v1(self, service_name): # for_v1
180         # they don't speak V2: must be a v1 introducer. Fall back to the v1
181         # 'subscribe' method, using a client adapter.
182         ca = WrapV2ClientInV1Interface(self)
183         self._debug_outstanding += 1
184         d = self._publisher.callRemote("subscribe", ca, service_name)
185         d.addBoth(self._debug_retired)
186         # We must also publish an empty 'stub_client' object, so the
187         # introducer can count how many clients are connected and see what
188         # versions they're running.
189         if not self._stub_client_furl:
190             self._stub_client = sc = StubClient()
191             self._stub_client_furl = self._tub.registerReference(sc)
192         def _publish_stub_client(ignored):
193             furl = self._stub_client_furl
194             self.publish("stub_client",
195                          { "anonymous-storage-FURL": furl,
196                            "permutation-seed-base32": get_tubid_string(furl),
197                            })
198         d.addCallback(_publish_stub_client)
199         return d
200
201     def create_announcement_dict(self, service_name, ann):
202         ann_d = { "version": 0,
203                   # "seqnum" and "nonce" will be populated with new values in
204                   # publish(), each time we make a change
205                   "nickname": self._nickname,
206                   "app-versions": self._app_versions,
207                   "my-version": self._my_version,
208                   "oldest-supported": self._oldest_supported,
209
210                   "service-name": service_name,
211                   }
212         ann_d.update(ann)
213         return ann_d
214
215     def publish(self, service_name, ann, signing_key=None):
216         # we increment the seqnum every time we publish something new
217         current_seqnum, current_nonce = self._sequencer()
218
219         ann_d = self.create_announcement_dict(service_name, ann)
220         self._outbound_announcements[service_name] = ann_d
221
222         # publish all announcements with the new seqnum and nonce
223         for service_name,ann_d in self._outbound_announcements.items():
224             ann_d["seqnum"] = current_seqnum
225             ann_d["nonce"] = current_nonce
226             ann_t = sign_to_foolscap(ann_d, signing_key)
227             self._published_announcements[service_name] = ann_t
228         self._maybe_publish()
229
230     def _maybe_publish(self):
231         if not self._publisher:
232             self.log("want to publish, but no introducer yet", level=log.NOISY)
233             return
234         # this re-publishes everything. The Introducer ignores duplicates
235         for ann_t in self._published_announcements.values():
236             self._debug_counts["outbound_message"] += 1
237             if V2 in self._publisher.version:
238                 self._debug_outstanding += 1
239                 d = self._publisher.callRemote("publish_v2", ann_t,
240                                                self._canary)
241                 d.addBoth(self._debug_retired)
242             else:
243                 d = self._handle_v1_publisher(ann_t) # for_v1
244             d.addErrback(log.err, ann_t=ann_t,
245                          facility="tahoe.introducer.client",
246                          level=log.WEIRD, umid="xs9pVQ")
247
248     def _handle_v1_publisher(self, ann_t): # for_v1
249         # they don't speak V2, so fall back to the old 'publish' method
250         # (which takes an unsigned tuple of bytestrings)
251         self.log("falling back to publish_v1",
252                  level=log.UNUSUAL, umid="9RCT1A")
253         ann_v1 = convert_announcement_v2_to_v1(ann_t)
254         self._debug_outstanding += 1
255         d = self._publisher.callRemote("publish", ann_v1)
256         d.addBoth(self._debug_retired)
257         return d
258
259
260     def remote_announce_v2(self, announcements):
261         lp = self.log("received %d announcements (v2)" % len(announcements))
262         return self.got_announcements(announcements, lp)
263
264     def got_announcements(self, announcements, lp=None):
265         # this is the common entry point for both v1 and v2 announcements
266         self._debug_counts["inbound_message"] += 1
267         for ann_t in announcements:
268             try:
269                 # this might raise UnknownKeyError or bad-sig error
270                 ann, key_s = unsign_from_foolscap(ann_t)
271                 # key is "v0-base32abc123"
272             except BadSignatureError:
273                 self.log("bad signature on inbound announcement: %s" % (ann_t,),
274                          parent=lp, level=log.WEIRD, umid="ZAU15Q")
275                 # process other announcements that arrived with the bad one
276                 continue
277
278             self._process_announcement(ann, key_s)
279
280     def _process_announcement(self, ann, key_s):
281         self._debug_counts["inbound_announcement"] += 1
282         service_name = str(ann["service-name"])
283         if service_name not in self._subscribed_service_names:
284             self.log("announcement for a service we don't care about [%s]"
285                      % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
286             self._debug_counts["wrong_service"] += 1
287             return
288         # for ASCII values, simplejson might give us unicode *or* bytes
289         if "nickname" in ann and isinstance(ann["nickname"], str):
290             ann["nickname"] = unicode(ann["nickname"])
291         nick_s = ann.get("nickname",u"").encode("utf-8")
292         lp2 = self.log(format="announcement for nickname '%(nick)s', service=%(svc)s: %(ann)s",
293                        nick=nick_s, svc=service_name, ann=ann, umid="BoKEag")
294
295         # how do we describe this node in the logs?
296         desc_bits = []
297         if key_s:
298             desc_bits.append("serverid=" + key_s[:20])
299         if "anonymous-storage-FURL" in ann:
300             tubid_s = get_tubid_string_from_ann(ann)
301             desc_bits.append("tubid=" + tubid_s[:8])
302         description = "/".join(desc_bits)
303
304         # the index is used to track duplicates
305         index = make_index(ann, key_s)
306
307         # is this announcement a duplicate?
308         if (index in self._inbound_announcements
309             and self._inbound_announcements[index][0] == ann):
310             self.log(format="reannouncement for [%(service)s]:%(description)s, ignoring",
311                      service=service_name, description=description,
312                      parent=lp2, level=log.UNUSUAL, umid="B1MIdA")
313             self._debug_counts["duplicate_announcement"] += 1
314             return
315
316         # does it update an existing one?
317         if index in self._inbound_announcements:
318             old,_,_ = self._inbound_announcements[index]
319             if "seqnum" in old:
320                 # must beat previous sequence number to replace
321                 if ("seqnum" not in ann
322                     or not isinstance(ann["seqnum"], (int,long))):
323                     self.log("not replacing old announcement, no valid seqnum: %s"
324                              % (ann,),
325                              parent=lp2, level=log.NOISY, umid="zFGH3Q")
326                     return
327                 if ann["seqnum"] <= old["seqnum"]:
328                     # note that exact replays are caught earlier, by
329                     # comparing the entire signed announcement.
330                     self.log("not replacing old announcement, "
331                              "new seqnum is too old (%s <= %s) "
332                              "(replay attack?): %s"
333                              % (ann["seqnum"], old["seqnum"], ann),
334                              parent=lp2, level=log.UNUSUAL, umid="JAAAoQ")
335                     return
336                 # ok, seqnum is newer, allow replacement
337             self._debug_counts["update"] += 1
338             self.log("replacing old announcement: %s" % (ann,),
339                      parent=lp2, level=log.NOISY, umid="wxwgIQ")
340         else:
341             self._debug_counts["new_announcement"] += 1
342             self.log("new announcement[%s]" % service_name,
343                      parent=lp2, level=log.NOISY)
344
345         self._inbound_announcements[index] = (ann, key_s, time.time())
346         # note: we never forget an index, but we might update its value
347
348         for (service_name2,cb,args,kwargs) in self._local_subscribers:
349             if service_name2 == service_name:
350                 eventually(cb, key_s, ann, *args, **kwargs)
351
352     def connected_to_introducer(self):
353         return bool(self._publisher)