3 from base64 import b32decode
4 from zope.interface import implements
5 from twisted.application import service
6 from foolscap.api import Referenceable, SturdyRef
8 from allmydata import node
9 from allmydata.util import log, rrefutil
10 from allmydata.introducer.interfaces import \
11 RIIntroducerPublisherAndSubscriberService
13 class IntroducerNode(node.Node):
14 PORTNUMFILE = "introducer.port"
15 NODETYPE = "introducer"
16 GENERATED_FILES = ['introducer.furl']
18 def __init__(self, basedir="."):
19 node.Node.__init__(self, basedir)
21 self.init_introducer()
22 webport = self.get_config("node", "web.port", None)
24 self.init_web(webport) # strports string
26 def init_introducer(self):
27 introducerservice = IntroducerService(self.basedir)
28 self.add_service(introducerservice)
30 d = self.when_tub_ready()
32 self.introducer_url = self.tub.registerReference(introducerservice,
34 self.log(" introducer is at %s" % self.introducer_url)
35 self.write_config("introducer.furl", self.introducer_url + "\n")
36 d.addCallback(_publish)
37 d.addErrback(log.err, facility="tahoe.init",
38 level=log.BAD, umid="UaNs9A")
40 def init_web(self, webport):
41 self.log("init_web(webport=%s)", args=(webport,))
43 from allmydata.webish import IntroducerWebishServer
44 nodeurl_path = os.path.join(self.basedir, "node.url")
45 staticdir = self.get_config("node", "web.static", "public_html")
46 staticdir = os.path.expanduser(staticdir)
47 ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir)
50 class IntroducerService(service.MultiService, Referenceable):
51 implements(RIIntroducerPublisherAndSubscriberService)
53 VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1":
55 "application-version": str(allmydata.__full_version__),
58 def __init__(self, basedir="."):
59 service.MultiService.__init__(self)
60 self.introducer_url = None
61 # 'index' is (service_name, tubid)
62 self._announcements = {} # dict of index -> (announcement, timestamp)
63 self._subscribers = {} # dict of (rref->timestamp) dicts
64 self._debug_counts = {"inbound_message": 0,
65 "inbound_duplicate": 0,
67 "outbound_message": 0,
68 "outbound_announcements": 0,
69 "inbound_subscribe": 0}
71 def log(self, *args, **kwargs):
72 if "facility" not in kwargs:
73 kwargs["facility"] = "tahoe.introducer"
74 return log.msg(*args, **kwargs)
76 def get_announcements(self):
77 return self._announcements
78 def get_subscribers(self):
79 return self._subscribers
81 def remote_get_version(self):
84 def remote_publish(self, announcement):
86 self._publish(announcement)
88 log.err(format="Introducer.remote_publish failed on %(ann)s",
89 ann=announcement, level=log.UNUSUAL, umid="620rWA")
92 def _publish(self, announcement):
93 self._debug_counts["inbound_message"] += 1
94 self.log("introducer: announcement published: %s" % (announcement,) )
95 (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
97 nodeid = b32decode(SturdyRef(furl).tubID.upper())
98 index = (service_name, nodeid)
100 if index in self._announcements:
101 (old_announcement, timestamp) = self._announcements[index]
102 if old_announcement == announcement:
103 self.log("but we already knew it, ignoring", level=log.NOISY)
104 self._debug_counts["inbound_duplicate"] += 1
107 self.log("old announcement being updated", level=log.NOISY)
108 self._debug_counts["inbound_update"] += 1
109 self._announcements[index] = (announcement, time.time())
111 for s in self._subscribers.get(service_name, []):
112 self._debug_counts["outbound_message"] += 1
113 self._debug_counts["outbound_announcements"] += 1
114 d = s.callRemote("announce", set([announcement]))
115 d.addErrback(rrefutil.trap_deadref)
116 d.addErrback(log.err,
117 format="subscriber errored on announcement %(ann)s",
118 ann=announcement, facility="tahoe.introducer",
119 level=log.UNUSUAL, umid="jfGMXQ")
121 def remote_subscribe(self, subscriber, service_name):
122 self.log("introducer: subscription[%s] request at %s" % (service_name,
124 self._debug_counts["inbound_subscribe"] += 1
125 if service_name not in self._subscribers:
126 self._subscribers[service_name] = {}
127 subscribers = self._subscribers[service_name]
128 if subscriber in subscribers:
129 self.log("but they're already subscribed, ignoring",
132 subscribers[subscriber] = time.time()
134 self.log("introducer: unsubscribing[%s] %s" % (service_name,
136 subscribers.pop(subscriber, None)
137 subscriber.notifyOnDisconnect(_remove)
141 for (sn2,nodeid),(ann,when) in self._announcements.items()
142 if sn2 == service_name] )
144 self._debug_counts["outbound_message"] += 1
145 self._debug_counts["outbound_announcements"] += len(announcements)
146 d = subscriber.callRemote("announce", announcements)
147 d.addErrback(rrefutil.trap_deadref)
148 d.addErrback(log.err,
149 format="subscriber errored during subscribe %(anns)s",
150 anns=announcements, facility="tahoe.introducer",
151 level=log.UNUSUAL, umid="mtZepQ")