]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/introducer/server.py
introducer web page: add CSS styling, roughly match client Welcome page
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / introducer / server.py
1
2 import time, os.path
3 from base64 import b32decode
4 from zope.interface import implements
5 from twisted.application import service
6 from foolscap.api import Referenceable, SturdyRef
7 import allmydata
8 from allmydata import node
9 from allmydata.util import log, rrefutil
10 from allmydata.introducer.interfaces import \
11      RIIntroducerPublisherAndSubscriberService
12
13 class IntroducerNode(node.Node):
14     PORTNUMFILE = "introducer.port"
15     NODETYPE = "introducer"
16     GENERATED_FILES = ['introducer.furl']
17
18     def __init__(self, basedir="."):
19         node.Node.__init__(self, basedir)
20         self.read_config()
21         self.init_introducer()
22         webport = self.get_config("node", "web.port", None)
23         if webport:
24             self.init_web(webport) # strports string
25
26     def init_introducer(self):
27         introducerservice = IntroducerService(self.basedir)
28         self.add_service(introducerservice)
29
30         d = self.when_tub_ready()
31         def _publish(res):
32             self.introducer_url = self.tub.registerReference(introducerservice,
33                                                              "introducer")
34             self.log(" introducer is at %s" % self.introducer_url)
35             self.write_config("introducer.furl", self.introducer_url + "\n")
36         d.addCallback(_publish)
37         d.addErrback(log.err, facility="tahoe.init",
38                      level=log.BAD, umid="UaNs9A")
39
40     def init_web(self, webport):
41         self.log("init_web(webport=%s)", args=(webport,))
42
43         from allmydata.webish import IntroducerWebishServer
44         nodeurl_path = os.path.join(self.basedir, "node.url")
45         staticdir = self.get_config("node", "web.static", "public_html")
46         staticdir = os.path.expanduser(staticdir)
47         ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir)
48         self.add_service(ws)
49
50 class IntroducerService(service.MultiService, Referenceable):
51     implements(RIIntroducerPublisherAndSubscriberService)
52     name = "introducer"
53     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1":
54                  { },
55                 "application-version": str(allmydata.__full_version__),
56                 }
57
58     def __init__(self, basedir="."):
59         service.MultiService.__init__(self)
60         self.introducer_url = None
61         # 'index' is (service_name, tubid)
62         self._announcements = {} # dict of index -> (announcement, timestamp)
63         self._subscribers = {} # dict of (rref->timestamp) dicts
64         self._debug_counts = {"inbound_message": 0,
65                               "inbound_duplicate": 0,
66                               "inbound_update": 0,
67                               "outbound_message": 0,
68                               "outbound_announcements": 0,
69                               "inbound_subscribe": 0}
70
71     def log(self, *args, **kwargs):
72         if "facility" not in kwargs:
73             kwargs["facility"] = "tahoe.introducer"
74         return log.msg(*args, **kwargs)
75
76     def get_announcements(self):
77         return self._announcements
78     def get_subscribers(self):
79         return self._subscribers
80
81     def remote_get_version(self):
82         return self.VERSION
83
84     def remote_publish(self, announcement):
85         try:
86             self._publish(announcement)
87         except:
88             log.err(format="Introducer.remote_publish failed on %(ann)s",
89                     ann=announcement, level=log.UNUSUAL, umid="620rWA")
90             raise
91
92     def _publish(self, announcement):
93         self._debug_counts["inbound_message"] += 1
94         self.log("introducer: announcement published: %s" % (announcement,) )
95         (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
96
97         nodeid = b32decode(SturdyRef(furl).tubID.upper())
98         index = (service_name, nodeid)
99
100         if index in self._announcements:
101             (old_announcement, timestamp) = self._announcements[index]
102             if old_announcement == announcement:
103                 self.log("but we already knew it, ignoring", level=log.NOISY)
104                 self._debug_counts["inbound_duplicate"] += 1
105                 return
106             else:
107                 self.log("old announcement being updated", level=log.NOISY)
108                 self._debug_counts["inbound_update"] += 1
109         self._announcements[index] = (announcement, time.time())
110
111         for s in self._subscribers.get(service_name, []):
112             self._debug_counts["outbound_message"] += 1
113             self._debug_counts["outbound_announcements"] += 1
114             d = s.callRemote("announce", set([announcement]))
115             d.addErrback(rrefutil.trap_deadref)
116             d.addErrback(log.err,
117                          format="subscriber errored on announcement %(ann)s",
118                          ann=announcement, facility="tahoe.introducer",
119                          level=log.UNUSUAL, umid="jfGMXQ")
120
121     def remote_subscribe(self, subscriber, service_name):
122         self.log("introducer: subscription[%s] request at %s" % (service_name,
123                                                                  subscriber))
124         self._debug_counts["inbound_subscribe"] += 1
125         if service_name not in self._subscribers:
126             self._subscribers[service_name] = {}
127         subscribers = self._subscribers[service_name]
128         if subscriber in subscribers:
129             self.log("but they're already subscribed, ignoring",
130                      level=log.UNUSUAL)
131             return
132         subscribers[subscriber] = time.time()
133         def _remove():
134             self.log("introducer: unsubscribing[%s] %s" % (service_name,
135                                                            subscriber))
136             subscribers.pop(subscriber, None)
137         subscriber.notifyOnDisconnect(_remove)
138
139         announcements = set(
140             [ ann
141               for (sn2,nodeid),(ann,when) in self._announcements.items()
142               if sn2 == service_name] )
143
144         self._debug_counts["outbound_message"] += 1
145         self._debug_counts["outbound_announcements"] += len(announcements)
146         d = subscriber.callRemote("announce", announcements)
147         d.addErrback(rrefutil.trap_deadref)
148         d.addErrback(log.err,
149                      format="subscriber errored during subscribe %(anns)s",
150                      anns=announcements, facility="tahoe.introducer",
151                      level=log.UNUSUAL, umid="mtZepQ")