3 from base64 import b32decode
4 from zope.interface import implements, Interface
5 from twisted.application import service
7 from allmydata.interfaces import InsufficientVersionError
8 from allmydata.util import log, idlib, rrefutil
9 from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \
10 RemoteInterface, Referenceable, eventually, SturdyRef
11 from allmydata.introducer.common import SubscriberDescriptor, \
12 AnnouncementDescriptor
13 FURL = StringConstraint(1000)
15 # We keep a copy of the old introducer (both client and server) here to
16 # support compatibility tests. The old client is supposed to handle the new
17 # server, and new client is supposed to handle the old server.
20 # Announcements are (FURL, service_name, remoteinterface_name,
21 # nickname, my_version, oldest_supported)
22 # the (FURL, service_name, remoteinterface_name) refer to the service being
23 # announced. The (nickname, my_version, oldest_supported) refer to the
24 # client as a whole. The my_version/oldest_supported strings can be parsed
25 # by an allmydata.util.version.Version instance, and then compared. The
26 # first goal is to make sure that nodes are not confused by speaking to an
27 # incompatible peer. The second goal is to enable the development of
28 # backwards-compatibility code.
30 Announcement = TupleOf(FURL, str, str,
33 class RIIntroducerSubscriberClient_v1(RemoteInterface):
34 __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com"
36 def announce(announcements=SetOf(Announcement)):
37 """I accept announcements from the publisher."""
40 def set_encoding_parameters(parameters=(int, int, int)):
41 """Advise the client of the recommended k-of-n encoding parameters
42 for this grid. 'parameters' is a tuple of (k, desired, n), where 'n'
43 is the total number of shares that will be created for any given
44 file, while 'k' is the number of shares that must be retrieved to
45 recover that file, and 'desired' is the minimum number of shares that
46 must be placed before the uploader will consider its job a success.
47 n/k is the expansion ratio, while k determines the robustness.
49 Introducers should specify 'n' according to the expected size of the
50 grid (there is no point to producing more shares than there are
51 peers), and k according to the desired reliability-vs-overhead goals.
53 Note that setting k=1 is equivalent to simple replication.
57 # When Foolscap can handle multiple interfaces (Foolscap#17), the
58 # full-powered introducer will implement both RIIntroducerPublisher and
59 # RIIntroducerSubscriberService. Until then, we define
60 # RIIntroducerPublisherAndSubscriberService as a combination of the two, and
61 # make everybody use that.
63 class RIIntroducerPublisher_v1(RemoteInterface):
64 """To publish a service to the world, connect to me and give me your
65 announcement message. I will deliver a copy to all connected subscribers."""
66 __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com"
68 def publish(announcement=Announcement):
72 class RIIntroducerSubscriberService_v1(RemoteInterface):
73 __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com"
75 def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
76 """Give me a subscriber reference, and I will call its new_peers()
77 method will any announcements that match the desired service name. I
78 will ignore duplicate subscriptions.
82 class RIIntroducerPublisherAndSubscriberService_v1(RemoteInterface):
83 __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com"
85 return DictOf(str, Any())
86 def publish(announcement=Announcement):
88 def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
91 class IIntroducerClient(Interface):
92 """I provide service introduction facilities for a node. I help nodes
93 publish their services to the rest of the world, and I help them learn
94 about services available on other nodes."""
96 def publish(furl, service_name, remoteinterface_name):
97 """Once you call this, I will tell the world that the Referenceable
98 available at FURL is available to provide a service named
99 SERVICE_NAME. The precise definition of the service being provided is
100 identified by the Foolscap 'remote interface name' in the last
101 parameter: this is supposed to be a globally-unique string that
102 identifies the RemoteInterface that is implemented."""
104 def subscribe_to(service_name, callback, *args, **kwargs):
105 """Call this if you will eventually want to use services with the
106 given SERVICE_NAME. This will prompt me to subscribe to announcements
107 of those services. Your callback will be invoked with at least two
108 arguments: a serverid (binary string), and an announcement
109 dictionary, followed by any additional callback args/kwargs you give
110 me. I will run your callback for both new announcements and for
111 announcements that have changed, but you must be prepared to tolerate
114 The announcement dictionary that I give you will have the following
118 service-name: str('storage')
121 remoteinterface-name: str(ri_name)
125 oldest-supported: str
127 Note that app-version will be an empty dictionary until #466 is done
128 and both the introducer and the remote client have been upgraded. For
129 current (native) server types, the serverid will always be equal to
130 the binary form of the FURL's tubid.
133 def connected_to_introducer():
134 """Returns a boolean, True if we are currently connected to the
135 introducer, False if not."""
138 class IntroducerClient_v1(service.Service, Referenceable):
139 implements(RIIntroducerSubscriberClient_v1, IIntroducerClient)
141 def __init__(self, tub, introducer_furl,
142 nickname, my_version, oldest_supported):
144 self.introducer_furl = introducer_furl
146 assert type(nickname) is unicode
147 self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
148 self._my_version = my_version
149 self._oldest_supported = oldest_supported
151 self._published_announcements = set()
153 self._publisher = None
155 self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
156 self._subscribed_service_names = set()
157 self._subscriptions = set() # requests we've actually sent
159 # _current_announcements remembers one announcement per
160 # (servicename,serverid) pair. Anything that arrives with the same
161 # pair will displace the previous one. This stores unpacked
162 # announcement dictionaries, which can be compared for equality to
163 # distinguish re-announcement from updates. It also provides memory
164 # for clients who subscribe after startup.
165 self._current_announcements = {}
167 self.encoding_parameters = None
169 # hooks for unit tests
170 self._debug_counts = {
171 "inbound_message": 0,
172 "inbound_announcement": 0,
174 "duplicate_announcement": 0,
176 "new_announcement": 0,
177 "outbound_message": 0,
179 self._debug_outstanding = 0
181 def _debug_retired(self, res):
182 self._debug_outstanding -= 1
185 def startService(self):
186 service.Service.startService(self)
187 self._introducer_error = None
188 rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
189 self._introducer_reconnector = rc
190 def connect_failed(failure):
191 self.log("Initial Introducer connection failed: perhaps it's down",
192 level=log.WEIRD, failure=failure, umid="c5MqUQ")
193 d = self._tub.getReference(self.introducer_furl)
194 d.addErrback(connect_failed)
196 def _got_introducer(self, publisher):
197 self.log("connected to introducer, getting versions")
198 default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
200 "application-version": "unknown: no get_version()",
202 d = rrefutil.add_version_to_remote_reference(publisher, default)
203 d.addCallback(self._got_versioned_introducer)
204 d.addErrback(self._got_error)
206 def _got_error(self, f):
207 # TODO: for the introducer, perhaps this should halt the application
208 self._introducer_error = f # polled by tests
210 def _got_versioned_introducer(self, publisher):
211 self.log("got introducer version: %s" % (publisher.version,))
212 # we require a V1 introducer
213 needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
214 if needed not in publisher.version:
215 raise InsufficientVersionError(needed, publisher.version)
216 self._publisher = publisher
217 publisher.notifyOnDisconnect(self._disconnected)
218 self._maybe_publish()
219 self._maybe_subscribe()
221 def _disconnected(self):
222 self.log("bummer, we've lost our connection to the introducer")
223 self._publisher = None
224 self._subscriptions.clear()
226 def log(self, *args, **kwargs):
227 if "facility" not in kwargs:
228 kwargs["facility"] = "tahoe.introducer"
229 return log.msg(*args, **kwargs)
232 def publish(self, furl, service_name, remoteinterface_name):
233 assert type(self._nickname_utf8) is str # we always send UTF-8
234 ann = (furl, service_name, remoteinterface_name,
235 self._nickname_utf8, self._my_version, self._oldest_supported)
236 self._published_announcements.add(ann)
237 self._maybe_publish()
239 def subscribe_to(self, service_name, cb, *args, **kwargs):
240 self._local_subscribers.append( (service_name,cb,args,kwargs) )
241 self._subscribed_service_names.add(service_name)
242 self._maybe_subscribe()
243 for (servicename,nodeid),ann_d in self._current_announcements.items():
244 if servicename == service_name:
245 eventually(cb, nodeid, ann_d)
247 def _maybe_subscribe(self):
248 if not self._publisher:
249 self.log("want to subscribe, but no introducer yet",
252 for service_name in self._subscribed_service_names:
253 if service_name not in self._subscriptions:
254 # there is a race here, but the subscription desk ignores
255 # duplicate requests.
256 self._subscriptions.add(service_name)
257 self._debug_outstanding += 1
258 d = self._publisher.callRemote("subscribe", self, service_name)
259 d.addBoth(self._debug_retired)
260 d.addErrback(rrefutil.trap_deadref)
261 d.addErrback(log.err, format="server errored during subscribe",
262 facility="tahoe.introducer",
263 level=log.WEIRD, umid="2uMScQ")
265 def _maybe_publish(self):
266 if not self._publisher:
267 self.log("want to publish, but no introducer yet", level=log.NOISY)
269 # this re-publishes everything. The Introducer ignores duplicates
270 for ann in self._published_announcements:
271 self._debug_counts["outbound_message"] += 1
272 self._debug_outstanding += 1
273 d = self._publisher.callRemote("publish", ann)
274 d.addBoth(self._debug_retired)
275 d.addErrback(rrefutil.trap_deadref)
276 d.addErrback(log.err,
277 format="server errored during publish %(ann)s",
278 ann=ann, facility="tahoe.introducer",
279 level=log.WEIRD, umid="xs9pVQ")
283 def remote_announce(self, announcements):
284 self.log("received %d announcements" % len(announcements))
285 self._debug_counts["inbound_message"] += 1
286 for ann in announcements:
288 self._process_announcement(ann)
290 log.err(format="unable to process announcement %(ann)s",
292 # Don't let a corrupt announcement prevent us from processing
293 # the remaining ones. Don't return an error to the server,
294 # since they'd just ignore it anyways.
297 def _process_announcement(self, ann):
298 self._debug_counts["inbound_announcement"] += 1
299 (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
300 if service_name not in self._subscribed_service_names:
301 self.log("announcement for a service we don't care about [%s]"
302 % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
303 self._debug_counts["wrong_service"] += 1
305 self.log("announcement for [%s]: %s" % (service_name, ann),
307 assert type(furl) is str
308 assert type(service_name) is str
309 assert type(ri_name) is str
310 assert type(nickname_utf8) is str
311 nickname = nickname_utf8.decode("utf-8")
312 assert type(nickname) is unicode
313 assert type(ver) is str
314 assert type(oldest) is str
316 nodeid = b32decode(SturdyRef(furl).tubID.upper())
317 nodeid_s = idlib.shortnodeid_b2a(nodeid)
319 ann_d = { "version": 0,
320 "service-name": service_name,
323 "nickname": nickname,
324 "app-versions": {}, # need #466 and v2 introducer
326 "oldest-supported": oldest,
329 index = (service_name, nodeid)
330 if self._current_announcements.get(index, None) == ann_d:
331 self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
332 service=service_name, nodeid=nodeid_s,
333 level=log.UNUSUAL, umid="B1MIdA")
334 self._debug_counts["duplicate_announcement"] += 1
336 if index in self._current_announcements:
337 self._debug_counts["update"] += 1
339 self._debug_counts["new_announcement"] += 1
341 self._current_announcements[index] = ann_d
342 # note: we never forget an index, but we might update its value
344 for (service_name2,cb,args,kwargs) in self._local_subscribers:
345 if service_name2 == service_name:
346 eventually(cb, nodeid, ann_d, *args, **kwargs)
348 def remote_set_encoding_parameters(self, parameters):
349 self.encoding_parameters = parameters
351 def connected_to_introducer(self):
352 return bool(self._publisher)
354 class IntroducerService_v1(service.MultiService, Referenceable):
355 implements(RIIntroducerPublisherAndSubscriberService_v1)
357 VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1":
359 "application-version": str(allmydata.__full_version__),
362 def __init__(self, basedir="."):
363 service.MultiService.__init__(self)
364 self.introducer_url = None
365 # 'index' is (service_name, tubid)
366 self._announcements = {} # dict of index -> (announcement, timestamp)
367 self._subscribers = {} # [service_name]->[rref]->timestamp
368 self._debug_counts = {"inbound_message": 0,
369 "inbound_duplicate": 0,
371 "outbound_message": 0,
372 "outbound_announcements": 0,
373 "inbound_subscribe": 0}
374 self._debug_outstanding = 0
376 def _debug_retired(self, res):
377 self._debug_outstanding -= 1
380 def log(self, *args, **kwargs):
381 if "facility" not in kwargs:
382 kwargs["facility"] = "tahoe.introducer"
383 return log.msg(*args, **kwargs)
385 def get_announcements(self, include_stub_clients=True):
387 for index, (ann_t, when) in self._announcements.items():
388 (furl, service_name, ri_name, nickname, ver, oldest) = ann_t
389 if service_name == "stub_client" and not include_stub_clients:
391 ann_d = {"nickname": nickname.decode("utf-8", "replace"),
393 "service-name": service_name,
394 "anonymous-storage-FURL": furl,
396 # the V2 introducer uses (service_name, key_s, tubid_s) as an
397 # index, so match that format for AnnouncementDescriptor
398 new_index = (index[0], None, idlib.nodeid_b2a(index[1]))
399 ad = AnnouncementDescriptor(when, new_index, None, ann_d)
400 announcements.append(ad)
403 def get_subscribers(self):
405 for service_name, subscribers in self._subscribers.items():
406 for rref, when in subscribers.items():
407 tubid = rref.getRemoteTubID() or "?"
408 remote_address = rrefutil.stringify_remote_address(rref)
409 nickname, version, app_versions = u"?", u"?", {}
410 sd = SubscriberDescriptor(service_name, when,
411 nickname, version, app_versions,
412 remote_address, tubid)
416 def remote_get_version(self):
419 def remote_publish(self, announcement):
421 self._publish(announcement)
423 log.err(format="Introducer.remote_publish failed on %(ann)s",
424 ann=announcement, level=log.UNUSUAL, umid="620rWA")
427 def _publish(self, announcement):
428 self._debug_counts["inbound_message"] += 1
429 self.log("introducer: announcement published: %s" % (announcement,) )
430 (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
431 #print "PUB", service_name, nickname_utf8
433 nodeid = b32decode(SturdyRef(furl).tubID.upper())
434 index = (service_name, nodeid)
436 if index in self._announcements:
437 (old_announcement, timestamp) = self._announcements[index]
438 if old_announcement == announcement:
439 self.log("but we already knew it, ignoring", level=log.NOISY)
440 self._debug_counts["inbound_duplicate"] += 1
443 self.log("old announcement being updated", level=log.NOISY)
444 self._debug_counts["inbound_update"] += 1
445 self._announcements[index] = (announcement, time.time())
447 for s in self._subscribers.get(service_name, []):
448 self._debug_counts["outbound_message"] += 1
449 self._debug_counts["outbound_announcements"] += 1
450 self._debug_outstanding += 1
451 d = s.callRemote("announce", set([announcement]))
452 d.addBoth(self._debug_retired)
453 d.addErrback(rrefutil.trap_deadref)
454 d.addErrback(log.err,
455 format="subscriber errored on announcement %(ann)s",
456 ann=announcement, facility="tahoe.introducer",
457 level=log.UNUSUAL, umid="jfGMXQ")
459 def remote_subscribe(self, subscriber, service_name):
460 self.log("introducer: subscription[%s] request at %s" % (service_name,
462 self._debug_counts["inbound_subscribe"] += 1
463 if service_name not in self._subscribers:
464 self._subscribers[service_name] = {}
465 subscribers = self._subscribers[service_name]
466 if subscriber in subscribers:
467 self.log("but they're already subscribed, ignoring",
470 subscribers[subscriber] = time.time()
472 self.log("introducer: unsubscribing[%s] %s" % (service_name,
474 subscribers.pop(subscriber, None)
475 subscriber.notifyOnDisconnect(_remove)
479 for (sn2,nodeid),(ann,when) in self._announcements.items()
480 if sn2 == service_name] )
482 self._debug_counts["outbound_message"] += 1
483 self._debug_counts["outbound_announcements"] += len(announcements)
484 self._debug_outstanding += 1
485 d = subscriber.callRemote("announce", announcements)
486 d.addBoth(self._debug_retired)
487 d.addErrback(rrefutil.trap_deadref)
488 d.addErrback(log.err,
489 format="subscriber errored during subscribe %(anns)s",
490 anns=announcements, facility="tahoe.introducer",
491 level=log.UNUSUAL, umid="1XChxA")