3 from base64 import b32decode
4 from zope.interface import implements, Interface
5 from twisted.application import service
7 from allmydata.interfaces import InsufficientVersionError
8 from allmydata.util import log, idlib, rrefutil
9 from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \
10 RemoteInterface, Referenceable, eventually, SturdyRef
11 FURL = StringConstraint(1000)
13 # We keep a copy of the old introducer (both client and server) here to
14 # support compatibility tests. The old client is supposed to handle the new
15 # server, and new client is supposed to handle the old server.
18 # Announcements are (FURL, service_name, remoteinterface_name,
19 # nickname, my_version, oldest_supported)
20 # the (FURL, service_name, remoteinterface_name) refer to the service being
21 # announced. The (nickname, my_version, oldest_supported) refer to the
22 # client as a whole. The my_version/oldest_supported strings can be parsed
23 # by an allmydata.util.version.Version instance, and then compared. The
24 # first goal is to make sure that nodes are not confused by speaking to an
25 # incompatible peer. The second goal is to enable the development of
26 # backwards-compatibility code.
28 Announcement = TupleOf(FURL, str, str,
31 class RIIntroducerSubscriberClient_v1(RemoteInterface):
32 __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com"
34 def announce(announcements=SetOf(Announcement)):
35 """I accept announcements from the publisher."""
38 def set_encoding_parameters(parameters=(int, int, int)):
39 """Advise the client of the recommended k-of-n encoding parameters
40 for this grid. 'parameters' is a tuple of (k, desired, n), where 'n'
41 is the total number of shares that will be created for any given
42 file, while 'k' is the number of shares that must be retrieved to
43 recover that file, and 'desired' is the minimum number of shares that
44 must be placed before the uploader will consider its job a success.
45 n/k is the expansion ratio, while k determines the robustness.
47 Introducers should specify 'n' according to the expected size of the
48 grid (there is no point to producing more shares than there are
49 peers), and k according to the desired reliability-vs-overhead goals.
51 Note that setting k=1 is equivalent to simple replication.
55 # When Foolscap can handle multiple interfaces (Foolscap#17), the
56 # full-powered introducer will implement both RIIntroducerPublisher and
57 # RIIntroducerSubscriberService. Until then, we define
58 # RIIntroducerPublisherAndSubscriberService as a combination of the two, and
59 # make everybody use that.
61 class RIIntroducerPublisher_v1(RemoteInterface):
62 """To publish a service to the world, connect to me and give me your
63 announcement message. I will deliver a copy to all connected subscribers."""
64 __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com"
66 def publish(announcement=Announcement):
70 class RIIntroducerSubscriberService_v1(RemoteInterface):
71 __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com"
73 def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
74 """Give me a subscriber reference, and I will call its new_peers()
75 method will any announcements that match the desired service name. I
76 will ignore duplicate subscriptions.
80 class RIIntroducerPublisherAndSubscriberService_v1(RemoteInterface):
81 __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com"
83 return DictOf(str, Any())
84 def publish(announcement=Announcement):
86 def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
89 class IIntroducerClient(Interface):
90 """I provide service introduction facilities for a node. I help nodes
91 publish their services to the rest of the world, and I help them learn
92 about services available on other nodes."""
94 def publish(furl, service_name, remoteinterface_name):
95 """Once you call this, I will tell the world that the Referenceable
96 available at FURL is available to provide a service named
97 SERVICE_NAME. The precise definition of the service being provided is
98 identified by the Foolscap 'remote interface name' in the last
99 parameter: this is supposed to be a globally-unique string that
100 identifies the RemoteInterface that is implemented."""
102 def subscribe_to(service_name, callback, *args, **kwargs):
103 """Call this if you will eventually want to use services with the
104 given SERVICE_NAME. This will prompt me to subscribe to announcements
105 of those services. Your callback will be invoked with at least two
106 arguments: a serverid (binary string), and an announcement
107 dictionary, followed by any additional callback args/kwargs you give
108 me. I will run your callback for both new announcements and for
109 announcements that have changed, but you must be prepared to tolerate
112 The announcement dictionary that I give you will have the following
116 service-name: str('storage')
119 remoteinterface-name: str(ri_name)
123 oldest-supported: str
125 Note that app-version will be an empty dictionary until #466 is done
126 and both the introducer and the remote client have been upgraded. For
127 current (native) server types, the serverid will always be equal to
128 the binary form of the FURL's tubid.
131 def connected_to_introducer():
132 """Returns a boolean, True if we are currently connected to the
133 introducer, False if not."""
136 class IntroducerClient_v1(service.Service, Referenceable):
137 implements(RIIntroducerSubscriberClient_v1, IIntroducerClient)
139 def __init__(self, tub, introducer_furl,
140 nickname, my_version, oldest_supported):
142 self.introducer_furl = introducer_furl
144 assert type(nickname) is unicode
145 self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
146 self._my_version = my_version
147 self._oldest_supported = oldest_supported
149 self._published_announcements = set()
151 self._publisher = None
153 self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
154 self._subscribed_service_names = set()
155 self._subscriptions = set() # requests we've actually sent
157 # _current_announcements remembers one announcement per
158 # (servicename,serverid) pair. Anything that arrives with the same
159 # pair will displace the previous one. This stores unpacked
160 # announcement dictionaries, which can be compared for equality to
161 # distinguish re-announcement from updates. It also provides memory
162 # for clients who subscribe after startup.
163 self._current_announcements = {}
165 self.encoding_parameters = None
167 # hooks for unit tests
168 self._debug_counts = {
169 "inbound_message": 0,
170 "inbound_announcement": 0,
172 "duplicate_announcement": 0,
174 "new_announcement": 0,
175 "outbound_message": 0,
177 self._debug_outstanding = 0
179 def _debug_retired(self, res):
180 self._debug_outstanding -= 1
183 def startService(self):
184 service.Service.startService(self)
185 self._introducer_error = None
186 rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
187 self._introducer_reconnector = rc
188 def connect_failed(failure):
189 self.log("Initial Introducer connection failed: perhaps it's down",
190 level=log.WEIRD, failure=failure, umid="c5MqUQ")
191 d = self._tub.getReference(self.introducer_furl)
192 d.addErrback(connect_failed)
194 def _got_introducer(self, publisher):
195 self.log("connected to introducer, getting versions")
196 default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
198 "application-version": "unknown: no get_version()",
200 d = rrefutil.add_version_to_remote_reference(publisher, default)
201 d.addCallback(self._got_versioned_introducer)
202 d.addErrback(self._got_error)
204 def _got_error(self, f):
205 # TODO: for the introducer, perhaps this should halt the application
206 self._introducer_error = f # polled by tests
208 def _got_versioned_introducer(self, publisher):
209 self.log("got introducer version: %s" % (publisher.version,))
210 # we require a V1 introducer
211 needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
212 if needed not in publisher.version:
213 raise InsufficientVersionError(needed, publisher.version)
214 self._publisher = publisher
215 publisher.notifyOnDisconnect(self._disconnected)
216 self._maybe_publish()
217 self._maybe_subscribe()
219 def _disconnected(self):
220 self.log("bummer, we've lost our connection to the introducer")
221 self._publisher = None
222 self._subscriptions.clear()
224 def log(self, *args, **kwargs):
225 if "facility" not in kwargs:
226 kwargs["facility"] = "tahoe.introducer"
227 return log.msg(*args, **kwargs)
230 def publish(self, furl, service_name, remoteinterface_name):
231 assert type(self._nickname_utf8) is str # we always send UTF-8
232 ann = (furl, service_name, remoteinterface_name,
233 self._nickname_utf8, self._my_version, self._oldest_supported)
234 self._published_announcements.add(ann)
235 self._maybe_publish()
237 def subscribe_to(self, service_name, cb, *args, **kwargs):
238 self._local_subscribers.append( (service_name,cb,args,kwargs) )
239 self._subscribed_service_names.add(service_name)
240 self._maybe_subscribe()
241 for (servicename,nodeid),ann_d in self._current_announcements.items():
242 if servicename == service_name:
243 eventually(cb, nodeid, ann_d)
245 def _maybe_subscribe(self):
246 if not self._publisher:
247 self.log("want to subscribe, but no introducer yet",
250 for service_name in self._subscribed_service_names:
251 if service_name not in self._subscriptions:
252 # there is a race here, but the subscription desk ignores
253 # duplicate requests.
254 self._subscriptions.add(service_name)
255 self._debug_outstanding += 1
256 d = self._publisher.callRemote("subscribe", self, service_name)
257 d.addBoth(self._debug_retired)
258 d.addErrback(rrefutil.trap_deadref)
259 d.addErrback(log.err, format="server errored during subscribe",
260 facility="tahoe.introducer",
261 level=log.WEIRD, umid="2uMScQ")
263 def _maybe_publish(self):
264 if not self._publisher:
265 self.log("want to publish, but no introducer yet", level=log.NOISY)
267 # this re-publishes everything. The Introducer ignores duplicates
268 for ann in self._published_announcements:
269 self._debug_counts["outbound_message"] += 1
270 self._debug_outstanding += 1
271 d = self._publisher.callRemote("publish", ann)
272 d.addBoth(self._debug_retired)
273 d.addErrback(rrefutil.trap_deadref)
274 d.addErrback(log.err,
275 format="server errored during publish %(ann)s",
276 ann=ann, facility="tahoe.introducer",
277 level=log.WEIRD, umid="xs9pVQ")
281 def remote_announce(self, announcements):
282 self.log("received %d announcements" % len(announcements))
283 self._debug_counts["inbound_message"] += 1
284 for ann in announcements:
286 self._process_announcement(ann)
288 log.err(format="unable to process announcement %(ann)s",
290 # Don't let a corrupt announcement prevent us from processing
291 # the remaining ones. Don't return an error to the server,
292 # since they'd just ignore it anyways.
295 def _process_announcement(self, ann):
296 self._debug_counts["inbound_announcement"] += 1
297 (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
298 if service_name not in self._subscribed_service_names:
299 self.log("announcement for a service we don't care about [%s]"
300 % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
301 self._debug_counts["wrong_service"] += 1
303 self.log("announcement for [%s]: %s" % (service_name, ann),
305 assert type(furl) is str
306 assert type(service_name) is str
307 assert type(ri_name) is str
308 assert type(nickname_utf8) is str
309 nickname = nickname_utf8.decode("utf-8")
310 assert type(nickname) is unicode
311 assert type(ver) is str
312 assert type(oldest) is str
314 nodeid = b32decode(SturdyRef(furl).tubID.upper())
315 nodeid_s = idlib.shortnodeid_b2a(nodeid)
317 ann_d = { "version": 0,
318 "service-name": service_name,
321 "nickname": nickname,
322 "app-versions": {}, # need #466 and v2 introducer
324 "oldest-supported": oldest,
327 index = (service_name, nodeid)
328 if self._current_announcements.get(index, None) == ann_d:
329 self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
330 service=service_name, nodeid=nodeid_s,
331 level=log.UNUSUAL, umid="B1MIdA")
332 self._debug_counts["duplicate_announcement"] += 1
334 if index in self._current_announcements:
335 self._debug_counts["update"] += 1
337 self._debug_counts["new_announcement"] += 1
339 self._current_announcements[index] = ann_d
340 # note: we never forget an index, but we might update its value
342 for (service_name2,cb,args,kwargs) in self._local_subscribers:
343 if service_name2 == service_name:
344 eventually(cb, nodeid, ann_d, *args, **kwargs)
346 def remote_set_encoding_parameters(self, parameters):
347 self.encoding_parameters = parameters
349 def connected_to_introducer(self):
350 return bool(self._publisher)
352 class IntroducerService_v1(service.MultiService, Referenceable):
353 implements(RIIntroducerPublisherAndSubscriberService_v1)
355 VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1":
357 "application-version": str(allmydata.__full_version__),
360 def __init__(self, basedir="."):
361 service.MultiService.__init__(self)
362 self.introducer_url = None
363 # 'index' is (service_name, tubid)
364 self._announcements = {} # dict of index -> (announcement, timestamp)
365 self._subscribers = {} # dict of (rref->timestamp) dicts
366 self._debug_counts = {"inbound_message": 0,
367 "inbound_duplicate": 0,
369 "outbound_message": 0,
370 "outbound_announcements": 0,
371 "inbound_subscribe": 0}
372 self._debug_outstanding = 0
374 def _debug_retired(self, res):
375 self._debug_outstanding -= 1
378 def log(self, *args, **kwargs):
379 if "facility" not in kwargs:
380 kwargs["facility"] = "tahoe.introducer"
381 return log.msg(*args, **kwargs)
383 def get_announcements(self):
384 return self._announcements
385 def get_subscribers(self):
386 return self._subscribers
388 def remote_get_version(self):
391 def remote_publish(self, announcement):
393 self._publish(announcement)
395 log.err(format="Introducer.remote_publish failed on %(ann)s",
396 ann=announcement, level=log.UNUSUAL, umid="620rWA")
399 def _publish(self, announcement):
400 self._debug_counts["inbound_message"] += 1
401 self.log("introducer: announcement published: %s" % (announcement,) )
402 (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
403 #print "PUB", service_name, nickname_utf8
405 nodeid = b32decode(SturdyRef(furl).tubID.upper())
406 index = (service_name, nodeid)
408 if index in self._announcements:
409 (old_announcement, timestamp) = self._announcements[index]
410 if old_announcement == announcement:
411 self.log("but we already knew it, ignoring", level=log.NOISY)
412 self._debug_counts["inbound_duplicate"] += 1
415 self.log("old announcement being updated", level=log.NOISY)
416 self._debug_counts["inbound_update"] += 1
417 self._announcements[index] = (announcement, time.time())
419 for s in self._subscribers.get(service_name, []):
420 self._debug_counts["outbound_message"] += 1
421 self._debug_counts["outbound_announcements"] += 1
422 self._debug_outstanding += 1
423 d = s.callRemote("announce", set([announcement]))
424 d.addBoth(self._debug_retired)
425 d.addErrback(rrefutil.trap_deadref)
426 d.addErrback(log.err,
427 format="subscriber errored on announcement %(ann)s",
428 ann=announcement, facility="tahoe.introducer",
429 level=log.UNUSUAL, umid="jfGMXQ")
431 def remote_subscribe(self, subscriber, service_name):
432 self.log("introducer: subscription[%s] request at %s" % (service_name,
434 self._debug_counts["inbound_subscribe"] += 1
435 if service_name not in self._subscribers:
436 self._subscribers[service_name] = {}
437 subscribers = self._subscribers[service_name]
438 if subscriber in subscribers:
439 self.log("but they're already subscribed, ignoring",
442 subscribers[subscriber] = time.time()
444 self.log("introducer: unsubscribing[%s] %s" % (service_name,
446 subscribers.pop(subscriber, None)
447 subscriber.notifyOnDisconnect(_remove)
451 for (sn2,nodeid),(ann,when) in self._announcements.items()
452 if sn2 == service_name] )
454 self._debug_counts["outbound_message"] += 1
455 self._debug_counts["outbound_announcements"] += len(announcements)
456 self._debug_outstanding += 1
457 d = subscriber.callRemote("announce", announcements)
458 d.addBoth(self._debug_retired)
459 d.addErrback(rrefutil.trap_deadref)
460 d.addErrback(log.err,
461 format="subscriber errored during subscribe %(anns)s",
462 anns=announcements, facility="tahoe.introducer",
463 level=log.UNUSUAL, umid="1XChxA")