]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/introducer/server.py
#518: replace various BASEDIR/* config files with a single BASEDIR/tahoe.cfg, with...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / introducer / server.py
1
2 import time, os.path
3 from zope.interface import implements
4 from twisted.application import service
5 from foolscap import Referenceable
6 from allmydata import node
7 from allmydata.util import log
8 from allmydata.introducer.interfaces import \
9      RIIntroducerPublisherAndSubscriberService
10 from allmydata.introducer.common import make_index
11
12 class IntroducerNode(node.Node):
13     PORTNUMFILE = "introducer.port"
14     NODETYPE = "introducer"
15
16     def __init__(self, basedir="."):
17         node.Node.__init__(self, basedir)
18         self.read_config()
19         self.init_introducer()
20         webport = self.get_config("node", "web.port", None)
21         if webport:
22             self.init_web(webport) # strports string
23
24     def init_introducer(self):
25         introducerservice = IntroducerService(self.basedir)
26         self.add_service(introducerservice)
27
28         d = self.when_tub_ready()
29         def _publish(res):
30             self.introducer_url = self.tub.registerReference(introducerservice,
31                                                              "introducer")
32             self.log(" introducer is at %s" % self.introducer_url)
33             self.write_config("introducer.furl", self.introducer_url + "\n")
34         d.addCallback(_publish)
35         d.addErrback(log.err, facility="tahoe.init",
36                      level=log.BAD, umid="UaNs9A")
37
38     def init_web(self, webport):
39         self.log("init_web(webport=%s)", args=(webport,))
40
41         from allmydata.webish import IntroducerWebishServer
42         nodeurl_path = os.path.join(self.basedir, "node.url")
43         ws = IntroducerWebishServer(webport, nodeurl_path)
44         self.add_service(ws)
45
46 class IntroducerService(service.MultiService, Referenceable):
47     implements(RIIntroducerPublisherAndSubscriberService)
48     name = "introducer"
49
50     def __init__(self, basedir="."):
51         service.MultiService.__init__(self)
52         self.introducer_url = None
53         # 'index' is (tubid, service_name)
54         self._announcements = {} # dict of index -> (announcement, timestamp)
55         self._subscribers = {} # dict of (rref->timestamp) dicts
56
57     def log(self, *args, **kwargs):
58         if "facility" not in kwargs:
59             kwargs["facility"] = "tahoe.introducer"
60         return log.msg(*args, **kwargs)
61
62     def get_announcements(self):
63         return self._announcements
64     def get_subscribers(self):
65         return self._subscribers
66
67     def remote_publish(self, announcement):
68         self.log("introducer: announcement published: %s" % (announcement,) )
69         index = make_index(announcement)
70         if index in self._announcements:
71             (old_announcement, timestamp) = self._announcements[index]
72             if old_announcement == announcement:
73                 self.log("but we already knew it, ignoring", level=log.NOISY)
74                 return
75             else:
76                 self.log("old announcement being updated", level=log.NOISY)
77         self._announcements[index] = (announcement, time.time())
78         (furl, service_name, ri_name, nickname, ver, oldest) = announcement
79         for s in self._subscribers.get(service_name, []):
80             s.callRemote("announce", set([announcement]))
81
82     def remote_subscribe(self, subscriber, service_name):
83         self.log("introducer: subscription[%s] request at %s" % (service_name,
84                                                                  subscriber))
85         if service_name not in self._subscribers:
86             self._subscribers[service_name] = {}
87         subscribers = self._subscribers[service_name]
88         if subscriber in subscribers:
89             self.log("but they're already subscribed, ignoring",
90                      level=log.UNUSUAL)
91             return
92         subscribers[subscriber] = time.time()
93         def _remove():
94             self.log("introducer: unsubscribing[%s] %s" % (service_name,
95                                                            subscriber))
96             subscribers.pop(subscriber, None)
97         subscriber.notifyOnDisconnect(_remove)
98
99         announcements = set( [ ann
100                                for idx,(ann,when) in self._announcements.items()
101                                if idx[1] == service_name] )
102         d = subscriber.callRemote("announce", announcements)
103         d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)
104
105
106