3 from zope.interface import implements
4 from twisted.application import service
5 from foolscap import Referenceable
6 from allmydata import node
7 from allmydata.util import log
8 from allmydata.introducer.interfaces import \
9 RIIntroducerPublisherAndSubscriberService
10 from allmydata.introducer.common import make_index
12 class IntroducerNode(node.Node):
13 PORTNUMFILE = "introducer.port"
14 NODETYPE = "introducer"
16 def __init__(self, basedir="."):
17 node.Node.__init__(self, basedir)
19 self.init_introducer()
20 webport = self.get_config("node", "web.port", None)
22 self.init_web(webport) # strports string
24 def init_introducer(self):
25 introducerservice = IntroducerService(self.basedir)
26 self.add_service(introducerservice)
28 d = self.when_tub_ready()
30 self.introducer_url = self.tub.registerReference(introducerservice,
32 self.log(" introducer is at %s" % self.introducer_url)
33 self.write_config("introducer.furl", self.introducer_url + "\n")
34 d.addCallback(_publish)
35 d.addErrback(log.err, facility="tahoe.init",
36 level=log.BAD, umid="UaNs9A")
38 def init_web(self, webport):
39 self.log("init_web(webport=%s)", args=(webport,))
41 from allmydata.webish import IntroducerWebishServer
42 nodeurl_path = os.path.join(self.basedir, "node.url")
43 ws = IntroducerWebishServer(webport, nodeurl_path)
46 class IntroducerService(service.MultiService, Referenceable):
47 implements(RIIntroducerPublisherAndSubscriberService)
50 def __init__(self, basedir="."):
51 service.MultiService.__init__(self)
52 self.introducer_url = None
53 # 'index' is (tubid, service_name)
54 self._announcements = {} # dict of index -> (announcement, timestamp)
55 self._subscribers = {} # dict of (rref->timestamp) dicts
57 def log(self, *args, **kwargs):
58 if "facility" not in kwargs:
59 kwargs["facility"] = "tahoe.introducer"
60 return log.msg(*args, **kwargs)
62 def get_announcements(self):
63 return self._announcements
64 def get_subscribers(self):
65 return self._subscribers
67 def remote_publish(self, announcement):
68 self.log("introducer: announcement published: %s" % (announcement,) )
69 index = make_index(announcement)
70 if index in self._announcements:
71 (old_announcement, timestamp) = self._announcements[index]
72 if old_announcement == announcement:
73 self.log("but we already knew it, ignoring", level=log.NOISY)
76 self.log("old announcement being updated", level=log.NOISY)
77 self._announcements[index] = (announcement, time.time())
78 (furl, service_name, ri_name, nickname, ver, oldest) = announcement
79 for s in self._subscribers.get(service_name, []):
80 s.callRemote("announce", set([announcement]))
82 def remote_subscribe(self, subscriber, service_name):
83 self.log("introducer: subscription[%s] request at %s" % (service_name,
85 if service_name not in self._subscribers:
86 self._subscribers[service_name] = {}
87 subscribers = self._subscribers[service_name]
88 if subscriber in subscribers:
89 self.log("but they're already subscribed, ignoring",
92 subscribers[subscriber] = time.time()
94 self.log("introducer: unsubscribing[%s] %s" % (service_name,
96 subscribers.pop(subscriber, None)
97 subscriber.notifyOnDisconnect(_remove)
99 announcements = set( [ ann
100 for idx,(ann,when) in self._announcements.items()
101 if idx[1] == service_name] )
102 d = subscriber.callRemote("announce", announcements)
103 d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)