]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/introducer/client.py
big rework of introducer client: change local API, split division of responsibilites...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / introducer / client.py
1
2 from base64 import b32decode
3 from zope.interface import implements
4 from twisted.application import service
5 from foolscap.api import Referenceable, SturdyRef, eventually
6 from allmydata.interfaces import InsufficientVersionError
7 from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
8      IIntroducerClient
9 from allmydata.util import log, idlib
10 from allmydata.util.rrefutil import add_version_to_remote_reference, trap_deadref
11
12
13 class IntroducerClient(service.Service, Referenceable):
14     implements(RIIntroducerSubscriberClient, IIntroducerClient)
15
16     def __init__(self, tub, introducer_furl,
17                  nickname, my_version, oldest_supported):
18         self._tub = tub
19         self.introducer_furl = introducer_furl
20
21         assert type(nickname) is unicode
22         self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
23         self._my_version = my_version
24         self._oldest_supported = oldest_supported
25
26         self._published_announcements = set()
27
28         self._publisher = None
29
30         self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
31         self._subscribed_service_names = set()
32         self._subscriptions = set() # requests we've actually sent
33
34         # _current_announcements remembers one announcement per
35         # (servicename,serverid) pair. Anything that arrives with the same
36         # pair will displace the previous one. This stores unpacked
37         # announcement dictionaries, which can be compared for equality to
38         # distinguish re-announcement from updates. It also provides memory
39         # for clients who subscribe after startup.
40         self._current_announcements = {}
41
42         self.encoding_parameters = None
43
44         # hooks for unit tests
45         self._debug_counts = {
46             "inbound_message": 0,
47             "inbound_announcement": 0,
48             "wrong_service": 0,
49             "duplicate_announcement": 0,
50             "update": 0,
51             "new_announcement": 0,
52             "outbound_message": 0,
53             }
54
55     def startService(self):
56         service.Service.startService(self)
57         self._introducer_error = None
58         rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
59         self._introducer_reconnector = rc
60         def connect_failed(failure):
61             self.log("Initial Introducer connection failed: perhaps it's down",
62                      level=log.WEIRD, failure=failure, umid="c5MqUQ")
63         d = self._tub.getReference(self.introducer_furl)
64         d.addErrback(connect_failed)
65
66     def _got_introducer(self, publisher):
67         self.log("connected to introducer, getting versions")
68         default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
69                     { },
70                     "application-version": "unknown: no get_version()",
71                     }
72         d = add_version_to_remote_reference(publisher, default)
73         d.addCallback(self._got_versioned_introducer)
74         d.addErrback(self._got_error)
75
76     def _got_error(self, f):
77         # TODO: for the introducer, perhaps this should halt the application
78         self._introducer_error = f # polled by tests
79
80     def _got_versioned_introducer(self, publisher):
81         self.log("got introducer version: %s" % (publisher.version,))
82         # we require a V1 introducer
83         needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
84         if needed not in publisher.version:
85             raise InsufficientVersionError(needed, publisher.version)
86         self._publisher = publisher
87         publisher.notifyOnDisconnect(self._disconnected)
88         self._maybe_publish()
89         self._maybe_subscribe()
90
91     def _disconnected(self):
92         self.log("bummer, we've lost our connection to the introducer")
93         self._publisher = None
94         self._subscriptions.clear()
95
96     def log(self, *args, **kwargs):
97         if "facility" not in kwargs:
98             kwargs["facility"] = "tahoe.introducer"
99         return log.msg(*args, **kwargs)
100
101
102     def publish(self, furl, service_name, remoteinterface_name):
103         assert type(self._nickname_utf8) is str # we always send UTF-8
104         ann = (furl, service_name, remoteinterface_name,
105                self._nickname_utf8, self._my_version, self._oldest_supported)
106         self._published_announcements.add(ann)
107         self._maybe_publish()
108
109     def subscribe_to(self, service_name, cb, *args, **kwargs):
110         self._local_subscribers.append( (service_name,cb,args,kwargs) )
111         self._subscribed_service_names.add(service_name)
112         self._maybe_subscribe()
113         for (servicename,nodeid),ann_d in self._current_announcements.items():
114             if servicename == service_name:
115                 eventually(cb, nodeid, ann_d)
116
117     def _maybe_subscribe(self):
118         if not self._publisher:
119             self.log("want to subscribe, but no introducer yet",
120                      level=log.NOISY)
121             return
122         for service_name in self._subscribed_service_names:
123             if service_name not in self._subscriptions:
124                 # there is a race here, but the subscription desk ignores
125                 # duplicate requests.
126                 self._subscriptions.add(service_name)
127                 d = self._publisher.callRemote("subscribe", self, service_name)
128                 d.addErrback(trap_deadref)
129                 d.addErrback(log.err, format="server errored during subscribe",
130                              facility="tahoe.introducer",
131                              level=log.WEIRD, umid="2uMScQ")
132
133     def _maybe_publish(self):
134         if not self._publisher:
135             self.log("want to publish, but no introducer yet", level=log.NOISY)
136             return
137         # this re-publishes everything. The Introducer ignores duplicates
138         for ann in self._published_announcements:
139             self._debug_counts["outbound_message"] += 1
140             d = self._publisher.callRemote("publish", ann)
141             d.addErrback(trap_deadref)
142             d.addErrback(log.err,
143                          format="server errored during publish %(ann)s",
144                          ann=ann, facility="tahoe.introducer",
145                          level=log.WEIRD, umid="xs9pVQ")
146
147
148
149     def remote_announce(self, announcements):
150         self.log("received %d announcements" % len(announcements))
151         self._debug_counts["inbound_message"] += 1
152         for ann in announcements:
153             try:
154                 self._process_announcement(ann)
155             except:
156                 log.err(format="unable to process announcement %(ann)s",
157                         ann=ann)
158                 # Don't let a corrupt announcement prevent us from processing
159                 # the remaining ones. Don't return an error to the server,
160                 # since they'd just ignore it anyways.
161                 pass
162
163     def _process_announcement(self, ann):
164         self._debug_counts["inbound_announcement"] += 1
165         (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
166         if service_name not in self._subscribed_service_names:
167             self.log("announcement for a service we don't care about [%s]"
168                      % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
169             self._debug_counts["wrong_service"] += 1
170             return
171         self.log("announcement for [%s]: %s" % (service_name, ann),
172                  umid="BoKEag")
173         assert type(furl) is str
174         assert type(service_name) is str
175         assert type(ri_name) is str
176         assert type(nickname_utf8) is str
177         nickname = nickname_utf8.decode("utf-8")
178         assert type(nickname) is unicode
179         assert type(ver) is str
180         assert type(oldest) is str
181
182         nodeid = b32decode(SturdyRef(furl).tubID.upper())
183         nodeid_s = idlib.shortnodeid_b2a(nodeid)
184
185         ann_d = { "version": 0,
186                   "service-name": service_name,
187
188                   "FURL": furl,
189                   "nickname": nickname,
190                   "app-versions": {}, # need #466 and v2 introducer
191                   "my-version": ver,
192                   "oldest-supported": oldest,
193                   }
194
195         index = (service_name, nodeid)
196         if self._current_announcements.get(index, None) == ann_d:
197             self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
198                      service=service_name, nodeid=nodeid_s,
199                      level=log.UNUSUAL, umid="B1MIdA")
200             self._debug_counts["duplicate_announcement"] += 1
201             return
202         if index in self._current_announcements:
203             self._debug_counts["update"] += 1
204         else:
205             self._debug_counts["new_announcement"] += 1
206
207         self._current_announcements[index] = ann_d
208         # note: we never forget an index, but we might update its value
209
210         for (service_name2,cb,args,kwargs) in self._local_subscribers:
211             if service_name2 == service_name:
212                 eventually(cb, nodeid, ann_d, *args, **kwargs)
213
214     def remote_set_encoding_parameters(self, parameters):
215         self.encoding_parameters = parameters
216
217     def connected_to_introducer(self):
218         return bool(self._publisher)