]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/stats.py
stats: fix service issues
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / stats.py
1
2 import os
3 import pickle
4 import pprint
5 import sys
6 import time
7 from collections import deque
8
9 from twisted.internet import reactor, defer
10 from twisted.application import service
11 from twisted.application.internet import TimerService
12 from zope.interface import implements
13 import foolscap
14 from foolscap.logging.gatherer import get_local_ip_for
15
16 from allmydata.util import log
17 from allmydata.interfaces import RIStatsProvider, RIStatsGatherer, IStatsProducer
18
19 class LoadMonitor(service.MultiService):
20     implements(IStatsProducer)
21
22     loop_interval = 1
23     num_samples = 60
24
25     def __init__(self, provider, warn_if_delay_exceeds=1):
26         service.MultiService.__init__(self)
27         self.provider = provider
28         self.warn_if_delay_exceeds = warn_if_delay_exceeds
29         self.started = False
30         self.last = None
31         self.stats = deque()
32
33     def startService(self):
34         if not self.started:
35             self.started = True
36             reactor.callLater(self.loop_interval, self.loop)
37         service.MultiService.startService(self)
38
39     def stopService(self):
40         self.started = False
41
42     def loop(self):
43         if not self.started:
44             return
45         now = time.time()
46         if self.last is not None:
47             delay = now - self.last - self.loop_interval
48             if delay > self.warn_if_delay_exceeds:
49                 log.msg(format='excessive reactor delay (%ss)', args=(delay,),
50                         level=log.UNUSUAL)
51             self.stats.append(delay)
52             while len(self.stats) > self.num_samples:
53                 self.stats.popleft()
54
55         self.last = now
56         reactor.callLater(self.loop_interval, self.loop)
57
58     def get_stats(self):
59         if self.stats:
60             avg = sum(self.stats) / len(self.stats)
61             m_x = max(self.stats)
62         else:
63             avg = m_x = 0
64         return { 'load_monitor.avg_load': avg,
65                  'load_monitor.max_load': m_x, }
66
67 class StatsProvider(foolscap.Referenceable, service.MultiService):
68     implements(RIStatsProvider)
69
70     def __init__(self, node, gatherer_furl):
71         service.MultiService.__init__(self)
72         self.node = node
73         self.gatherer_furl = gatherer_furl
74
75         self.counters = {}
76         self.stats_producers = []
77
78         self.load_monitor = LoadMonitor(self)
79         self.load_monitor.setServiceParent(self)
80         self.register_producer(self.load_monitor)
81
82     def startService(self):
83         if self.node:
84             d = self.node.when_tub_ready()
85             def connect(junk):
86                 nickname = self.node.get_config('nickname')
87                 self.node.tub.connectTo(self.gatherer_furl, self._connected, nickname)
88             d.addCallback(connect)
89         service.MultiService.startService(self)
90
91     def count(self, name, delta):
92         val = self.counters.setdefault(name, 0)
93         self.counters[name] = val + delta
94
95     def register_producer(self, stats_producer):
96         self.stats_producers.append(IStatsProducer(stats_producer))
97
98     def remote_get_stats(self):
99         stats = {}
100         for sp in self.stats_producers:
101             stats.update(sp.get_stats())
102         return { 'counters': self.counters, 'stats': stats }
103
104     def _connected(self, gatherer, nickname):
105         gatherer.callRemote('provide', self, nickname or '')
106
107 class StatsGatherer(foolscap.Referenceable, service.MultiService):
108     implements(RIStatsGatherer)
109
110     poll_interval = 60
111
112     def __init__(self, tub):
113         service.MultiService.__init__(self)
114         self.tub = tub
115
116         self.clients = {}
117         self.nicknames = {}
118
119     def startService(self):
120         self.timer = TimerService(self.poll_interval, self.poll)
121         self.timer.setServiceParent(self)
122         service.MultiService.startService(self)
123
124     def get_furl(self):
125         return self.tub.registerReference(self, furlFile='stats_gatherer.furl')
126
127     def get_tubid(self, rref):
128         return foolscap.SturdyRef(rref.tracker.getURL()).getTubRef().getTubID()
129
130     def remote_provide(self, provider, nickname):
131         tubid = self.get_tubid(provider)
132         if tubid == '<unauth>':
133             print "WARNING: failed to get tubid for %s (%s)" % (provider, nickname)
134             # don't add to clients to poll (polluting data) don't care about disconnect
135             return
136         self.clients[tubid] = provider
137         self.nicknames[tubid] = nickname
138         provider.notifyOnDisconnect(self.lost_client, tubid)
139
140     def lost_client(self, tubid):
141         del self.clients[tubid]
142         del self.nicknames[tubid]
143
144     def poll(self):
145         for tubid,client in self.clients.items():
146             nickname = self.nicknames.get(tubid)
147             d = client.callRemote('get_stats')
148             d.addCallback(self.got_stats, tubid, nickname)
149
150     def got_stats(self, stats, tubid, nickname):
151         raise NotImplementedError()
152
153 class StdOutStatsGatherer(StatsGatherer):
154     def remote_provide(self, provider, nickname):
155         tubid = self.get_tubid(provider)
156         print 'connect "%s" [%s]' % (nickname, tubid)
157         StatsGatherer.remote_provide(self, provider, nickname)
158
159     def lost_client(self, tubid):
160         print 'disconnect "%s" [%s]:' % (self.nicknames[tubid], tubid)
161         StatsGatherer.lost_client(self, tubid)
162
163     def got_stats(self, stats, tubid, nickname):
164         print '"%s" [%s]:' % (nickname, tubid)
165         pprint.pprint(stats)
166
167 class PickleStatsGatherer(StdOutStatsGatherer): # for connect/disconnect notifications;
168 #class PickleStatsGatherer(StatsGatherer):
169     def __init__(self, tub, picklefile):
170         StatsGatherer.__init__(self, tub)
171         self.picklefile = picklefile
172
173         if os.path.exists(picklefile):
174             f = open(picklefile, 'rb')
175             self.gathered_stats = pickle.load(f)
176             f.close()
177         else:
178             self.gathered_stats = {}
179
180     def got_stats(self, stats, tubid, nickname):
181         s = self.gathered_stats.setdefault(tubid, {})
182         s['timestamp'] = time.time()
183         s['nickname'] = nickname
184         s['stats'] = stats
185         self.dump_pickle()
186
187     def dump_pickle(self):
188         tmp = "%s.tmp" % (self.picklefile,)
189         f = open(tmp, 'wb')
190         pickle.dump(self.gathered_stats, f)
191         f.close()
192         if os.path.exists(self.picklefile):
193             os.unlink(self.picklefile)
194         os.rename(tmp, self.picklefile)
195
196 class GathererApp(object):
197     def __init__(self):
198         d = self.setup_tub()
199         d.addCallback(self._tub_ready)
200
201     def setup_tub(self):
202         self._tub = foolscap.Tub(certFile="stats_gatherer.pem")
203         self._tub.setOption("logLocalFailures", True)
204         self._tub.setOption("logRemoteFailures", True)
205         self._tub.startService()
206         portnumfile = "portnum"
207         try:
208             portnum = int(open(portnumfile, "r").read())
209         except (EnvironmentError, ValueError):
210             portnum = 0
211         self._tub.listenOn("tcp:%d" % portnum)
212         d = defer.maybeDeferred(get_local_ip_for)
213         d.addCallback(self._set_location)
214         d.addCallback(lambda res: self._tub)
215         return d
216
217     def _set_location(self, local_address):
218         if local_address is None:
219             local_addresses = ["127.0.0.1"]
220         else:
221             local_addresses = [local_address, "127.0.0.1"]
222         l = self._tub.getListeners()[0]
223         portnum = l.getPortnum()
224         portnumfile = "portnum"
225         open(portnumfile, "w").write("%d\n" % portnum)
226         local_addresses = [ "%s:%d" % (addr, portnum,)
227                             for addr in local_addresses ]
228         assert len(local_addresses) >= 1
229         location = ",".join(local_addresses)
230         self._tub.setLocation(location)
231
232     def _tub_ready(self, tub):
233         sg = PickleStatsGatherer(tub, 'stats.pickle')
234         sg.setServiceParent(tub)
235         sg.verbose = True
236         print '\nStatsGatherer: %s\n' % (sg.get_furl(),)
237
238 def main(argv):
239     ga = GathererApp()
240     reactor.run()
241
242 if __name__ == '__main__':
243     main(sys.argv)