]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/stats.py
stats: make StatsGatherer happy about sharing a process with other services, add...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / stats.py
1
2 import os
3 import pickle
4 import pprint
5 import sys
6 import time
7 from collections import deque
8
9 from twisted.internet import reactor, defer
10 from twisted.application import service
11 from twisted.application.internet import TimerService
12 from zope.interface import implements
13 import foolscap
14 from foolscap.logging.gatherer import get_local_ip_for
15 from twisted.internet.error import ConnectionDone, ConnectionLost
16 from foolscap import DeadReferenceError
17
18 from allmydata.util import log
19 from allmydata.interfaces import RIStatsProvider, RIStatsGatherer, IStatsProducer
20
21 class LoadMonitor(service.MultiService):
22     implements(IStatsProducer)
23
24     loop_interval = 1
25     num_samples = 60
26
27     def __init__(self, provider, warn_if_delay_exceeds=1):
28         service.MultiService.__init__(self)
29         self.provider = provider
30         self.warn_if_delay_exceeds = warn_if_delay_exceeds
31         self.started = False
32         self.last = None
33         self.stats = deque()
34         self.timer = None
35
36     def startService(self):
37         if not self.started:
38             self.started = True
39             self.timer = reactor.callLater(self.loop_interval, self.loop)
40         service.MultiService.startService(self)
41
42     def stopService(self):
43         self.started = False
44         if self.timer:
45             self.timer.cancel()
46             self.timer = None
47         return service.MultiService.stopService(self)
48
49     def loop(self):
50         self.timer = None
51         if not self.started:
52             return
53         now = time.time()
54         if self.last is not None:
55             delay = now - self.last - self.loop_interval
56             if delay > self.warn_if_delay_exceeds:
57                 log.msg(format='excessive reactor delay (%ss)', args=(delay,),
58                         level=log.UNUSUAL)
59             self.stats.append(delay)
60             while len(self.stats) > self.num_samples:
61                 self.stats.popleft()
62
63         self.last = now
64         self.timer = reactor.callLater(self.loop_interval, self.loop)
65
66     def get_stats(self):
67         if self.stats:
68             avg = sum(self.stats) / len(self.stats)
69             m_x = max(self.stats)
70         else:
71             avg = m_x = 0
72         return { 'load_monitor.avg_load': avg,
73                  'load_monitor.max_load': m_x, }
74
75 class StatsProvider(foolscap.Referenceable, service.MultiService):
76     implements(RIStatsProvider)
77
78     def __init__(self, node, gatherer_furl):
79         service.MultiService.__init__(self)
80         self.node = node
81         self.gatherer_furl = gatherer_furl
82
83         self.counters = {}
84         self.stats_producers = []
85
86         self.load_monitor = LoadMonitor(self)
87         self.load_monitor.setServiceParent(self)
88         self.register_producer(self.load_monitor)
89
90     def startService(self):
91         if self.node:
92             d = self.node.when_tub_ready()
93             def connect(junk):
94                 nickname = self.node.get_config('nickname')
95                 self.node.tub.connectTo(self.gatherer_furl, self._connected, nickname)
96             d.addCallback(connect)
97         service.MultiService.startService(self)
98
99     def count(self, name, delta):
100         val = self.counters.setdefault(name, 0)
101         self.counters[name] = val + delta
102
103     def register_producer(self, stats_producer):
104         self.stats_producers.append(IStatsProducer(stats_producer))
105
106     def remote_get_stats(self):
107         stats = {}
108         for sp in self.stats_producers:
109             stats.update(sp.get_stats())
110         return { 'counters': self.counters, 'stats': stats }
111
112     def _connected(self, gatherer, nickname):
113         gatherer.callRemoteOnly('provide', self, nickname or '')
114
115 class StatsGatherer(foolscap.Referenceable, service.MultiService):
116     implements(RIStatsGatherer)
117
118     poll_interval = 60
119
120     def __init__(self, tub, basedir):
121         service.MultiService.__init__(self)
122         self.tub = tub
123         self.basedir = basedir
124
125         self.clients = {}
126         self.nicknames = {}
127
128     def startService(self):
129         # the Tub must have a location set on it by now
130         service.MultiService.startService(self)
131         self.timer = TimerService(self.poll_interval, self.poll)
132         self.timer.setServiceParent(self)
133         self.registerGatherer()
134
135     def get_furl(self):
136         return self.my_furl
137
138     def registerGatherer(self):
139         furl_file = os.path.join(self.basedir, "stats_gatherer.furl")
140         self.my_furl = self.tub.registerReference(self, furlFile=furl_file)
141
142     def get_tubid(self, rref):
143         return foolscap.SturdyRef(rref.tracker.getURL()).getTubRef().getTubID()
144
145     def remote_provide(self, provider, nickname):
146         tubid = self.get_tubid(provider)
147         if tubid == '<unauth>':
148             print "WARNING: failed to get tubid for %s (%s)" % (provider, nickname)
149             # don't add to clients to poll (polluting data) don't care about disconnect
150             return
151         self.clients[tubid] = provider
152         self.nicknames[tubid] = nickname
153
154     def poll(self):
155         for tubid,client in self.clients.items():
156             nickname = self.nicknames.get(tubid)
157             d = client.callRemote('get_stats')
158             d.addCallbacks(self.got_stats, self.lost_client,
159                            callbackArgs=(tubid, nickname),
160                            errbackArgs=(tubid,))
161             d.addErrback(self.log_client_error, tubid)
162
163     def lost_client(self, f, tubid):
164         # this is called lazily, when a get_stats request fails
165         del self.clients[tubid]
166         del self.nicknames[tubid]
167         f.trap(DeadReferenceError, ConnectionDone, ConnectionLost)
168
169     def log_client_error(self, f, tubid):
170         log.msg("StatsGatherer: error in get_stats(), peerid=%s" % tubid,
171                 level=log.UNUSUAL, failure=f)
172
173     def got_stats(self, stats, tubid, nickname):
174         raise NotImplementedError()
175
176 class StdOutStatsGatherer(StatsGatherer):
177     verbose = True
178     def remote_provide(self, provider, nickname):
179         tubid = self.get_tubid(provider)
180         if self.verbose:
181             print 'connect "%s" [%s]' % (nickname, tubid)
182             provider.notifyOnDisconnect(self.announce_lost_client, tubid)
183         StatsGatherer.remote_provide(self, provider, nickname)
184
185     def announce_lost_client(self, tubid):
186         print 'disconnect "%s" [%s]:' % (self.nicknames[tubid], tubid)
187
188     def got_stats(self, stats, tubid, nickname):
189         print '"%s" [%s]:' % (nickname, tubid)
190         pprint.pprint(stats)
191
192 class PickleStatsGatherer(StdOutStatsGatherer):
193     # inherit from StdOutStatsGatherer for connect/disconnect notifications
194
195     def __init__(self, tub, basedir=".", verbose=True):
196         self.verbose = verbose
197         StatsGatherer.__init__(self, tub, basedir)
198         self.picklefile = os.path.join(basedir, "stats.pickle")
199
200         if os.path.exists(self.picklefile):
201             f = open(self.picklefile, 'rb')
202             self.gathered_stats = pickle.load(f)
203             f.close()
204         else:
205             self.gathered_stats = {}
206
207     def got_stats(self, stats, tubid, nickname):
208         s = self.gathered_stats.setdefault(tubid, {})
209         s['timestamp'] = time.time()
210         s['nickname'] = nickname
211         s['stats'] = stats
212         self.dump_pickle()
213
214     def dump_pickle(self):
215         tmp = "%s.tmp" % (self.picklefile,)
216         f = open(tmp, 'wb')
217         pickle.dump(self.gathered_stats, f)
218         f.close()
219         if os.path.exists(self.picklefile):
220             os.unlink(self.picklefile)
221         os.rename(tmp, self.picklefile)
222
223 class GathererApp(object):
224     def __init__(self):
225         d = self.setup_tub()
226         d.addCallback(self._tub_ready)
227
228     def setup_tub(self):
229         self._tub = foolscap.Tub(certFile="stats_gatherer.pem")
230         self._tub.setOption("logLocalFailures", True)
231         self._tub.setOption("logRemoteFailures", True)
232         self._tub.startService()
233         portnumfile = "portnum"
234         try:
235             portnum = int(open(portnumfile, "r").read())
236         except (EnvironmentError, ValueError):
237             portnum = 0
238         self._tub.listenOn("tcp:%d" % portnum)
239         d = defer.maybeDeferred(get_local_ip_for)
240         d.addCallback(self._set_location)
241         d.addCallback(lambda res: self._tub)
242         return d
243
244     def _set_location(self, local_address):
245         if local_address is None:
246             local_addresses = ["127.0.0.1"]
247         else:
248             local_addresses = [local_address, "127.0.0.1"]
249         l = self._tub.getListeners()[0]
250         portnum = l.getPortnum()
251         portnumfile = "portnum"
252         open(portnumfile, "w").write("%d\n" % portnum)
253         local_addresses = [ "%s:%d" % (addr, portnum,)
254                             for addr in local_addresses ]
255         assert len(local_addresses) >= 1
256         location = ",".join(local_addresses)
257         self._tub.setLocation(location)
258
259     def _tub_ready(self, tub):
260         sg = PickleStatsGatherer(tub, ".")
261         sg.setServiceParent(tub)
262         sg.verbose = True
263         print '\nStatsGatherer: %s\n' % (sg.get_furl(),)
264
265 def main(argv):
266     ga = GathererApp()
267     reactor.run()
268
269 if __name__ == '__main__':
270     main(sys.argv)