]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/stats.py
stats_gatherer: verbose debug logging
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / stats.py
1
2 import os
3 import pickle
4 import pprint
5 import sys
6 import time
7 from collections import deque
8
9 from twisted.internet import reactor, defer
10 from twisted.application import service
11 from twisted.application.internet import TimerService
12 from zope.interface import implements
13 import foolscap
14 from foolscap.logging.gatherer import get_local_ip_for
15 from twisted.internet.error import ConnectionDone, ConnectionLost
16 from foolscap import DeadReferenceError
17
18 from allmydata.util import log
19 from allmydata.interfaces import RIStatsProvider, RIStatsGatherer, IStatsProducer
20
21 class LoadMonitor(service.MultiService):
22     implements(IStatsProducer)
23
24     loop_interval = 1
25     num_samples = 60
26
27     def __init__(self, provider, warn_if_delay_exceeds=1):
28         service.MultiService.__init__(self)
29         self.provider = provider
30         self.warn_if_delay_exceeds = warn_if_delay_exceeds
31         self.started = False
32         self.last = None
33         self.stats = deque()
34         self.timer = None
35
36     def startService(self):
37         if not self.started:
38             self.started = True
39             self.timer = reactor.callLater(self.loop_interval, self.loop)
40         service.MultiService.startService(self)
41
42     def stopService(self):
43         self.started = False
44         if self.timer:
45             self.timer.cancel()
46             self.timer = None
47         return service.MultiService.stopService(self)
48
49     def loop(self):
50         self.timer = None
51         if not self.started:
52             return
53         now = time.time()
54         if self.last is not None:
55             delay = now - self.last - self.loop_interval
56             if delay > self.warn_if_delay_exceeds:
57                 log.msg(format='excessive reactor delay (%ss)', args=(delay,),
58                         level=log.UNUSUAL)
59             self.stats.append(delay)
60             while len(self.stats) > self.num_samples:
61                 self.stats.popleft()
62
63         self.last = now
64         self.timer = reactor.callLater(self.loop_interval, self.loop)
65
66     def get_stats(self):
67         if self.stats:
68             avg = sum(self.stats) / len(self.stats)
69             m_x = max(self.stats)
70         else:
71             avg = m_x = 0
72         return { 'load_monitor.avg_load': avg,
73                  'load_monitor.max_load': m_x, }
74
75 class StatsProvider(foolscap.Referenceable, service.MultiService):
76     implements(RIStatsProvider)
77
78     def __init__(self, node, gatherer_furl):
79         service.MultiService.__init__(self)
80         self.node = node
81         self.gatherer_furl = gatherer_furl
82
83         self.counters = {}
84         self.stats_producers = []
85
86         self.load_monitor = LoadMonitor(self)
87         self.load_monitor.setServiceParent(self)
88         self.register_producer(self.load_monitor)
89
90     def startService(self):
91         if self.node:
92             d = self.node.when_tub_ready()
93             def connect(junk):
94                 nickname = self.node.get_config('nickname')
95                 self.node.tub.connectTo(self.gatherer_furl, self._connected, nickname)
96             d.addCallback(connect)
97         service.MultiService.startService(self)
98
99     def count(self, name, delta):
100         val = self.counters.setdefault(name, 0)
101         self.counters[name] = val + delta
102
103     def register_producer(self, stats_producer):
104         self.stats_producers.append(IStatsProducer(stats_producer))
105
106     def remote_get_stats(self):
107         stats = {}
108         for sp in self.stats_producers:
109             stats.update(sp.get_stats())
110         #return { 'counters': self.counters, 'stats': stats }
111         ret = { 'counters': self.counters, 'stats': stats }
112         log.msg(format='get_stats() -> %s', args=(pprint.pformat(ret),), level=12)
113         return ret
114
115     def _connected(self, gatherer, nickname):
116         gatherer.callRemoteOnly('provide', self, nickname or '')
117
118 class StatsGatherer(foolscap.Referenceable, service.MultiService):
119     implements(RIStatsGatherer)
120
121     poll_interval = 60
122
123     def __init__(self, tub, basedir):
124         service.MultiService.__init__(self)
125         self.tub = tub
126         self.basedir = basedir
127
128         self.clients = {}
129         self.nicknames = {}
130
131     def startService(self):
132         # the Tub must have a location set on it by now
133         service.MultiService.startService(self)
134         self.timer = TimerService(self.poll_interval, self.poll)
135         self.timer.setServiceParent(self)
136         self.registerGatherer()
137
138     def get_furl(self):
139         return self.my_furl
140
141     def registerGatherer(self):
142         furl_file = os.path.join(self.basedir, "stats_gatherer.furl")
143         self.my_furl = self.tub.registerReference(self, furlFile=furl_file)
144
145     def get_tubid(self, rref):
146         return foolscap.SturdyRef(rref.tracker.getURL()).getTubRef().getTubID()
147
148     def remote_provide(self, provider, nickname):
149         tubid = self.get_tubid(provider)
150         if tubid == '<unauth>':
151             print "WARNING: failed to get tubid for %s (%s)" % (provider, nickname)
152             # don't add to clients to poll (polluting data) don't care about disconnect
153             return
154         self.clients[tubid] = provider
155         self.nicknames[tubid] = nickname
156
157     def poll(self):
158         for tubid,client in self.clients.items():
159             nickname = self.nicknames.get(tubid)
160             d = client.callRemote('get_stats')
161             d.addCallbacks(self.got_stats, self.lost_client,
162                            callbackArgs=(tubid, nickname),
163                            errbackArgs=(tubid,))
164             d.addErrback(self.log_client_error, tubid)
165
166     def lost_client(self, f, tubid):
167         # this is called lazily, when a get_stats request fails
168         del self.clients[tubid]
169         del self.nicknames[tubid]
170         f.trap(DeadReferenceError, ConnectionDone, ConnectionLost)
171
172     def log_client_error(self, f, tubid):
173         log.msg("StatsGatherer: error in get_stats(), peerid=%s" % tubid,
174                 level=log.UNUSUAL, failure=f)
175
176     def got_stats(self, stats, tubid, nickname):
177         raise NotImplementedError()
178
179 class StdOutStatsGatherer(StatsGatherer):
180     verbose = True
181     def remote_provide(self, provider, nickname):
182         tubid = self.get_tubid(provider)
183         if self.verbose:
184             print 'connect "%s" [%s]' % (nickname, tubid)
185             provider.notifyOnDisconnect(self.announce_lost_client, tubid)
186         StatsGatherer.remote_provide(self, provider, nickname)
187
188     def announce_lost_client(self, tubid):
189         print 'disconnect "%s" [%s]:' % (self.nicknames[tubid], tubid)
190
191     def got_stats(self, stats, tubid, nickname):
192         print '"%s" [%s]:' % (nickname, tubid)
193         pprint.pprint(stats)
194
195 class PickleStatsGatherer(StdOutStatsGatherer):
196     # inherit from StdOutStatsGatherer for connect/disconnect notifications
197
198     def __init__(self, tub, basedir=".", verbose=True):
199         self.verbose = verbose
200         StatsGatherer.__init__(self, tub, basedir)
201         self.picklefile = os.path.join(basedir, "stats.pickle")
202
203         if os.path.exists(self.picklefile):
204             f = open(self.picklefile, 'rb')
205             self.gathered_stats = pickle.load(f)
206             f.close()
207         else:
208             self.gathered_stats = {}
209
210     def got_stats(self, stats, tubid, nickname):
211         s = self.gathered_stats.setdefault(tubid, {})
212         s['timestamp'] = time.time()
213         s['nickname'] = nickname
214         s['stats'] = stats
215         self.dump_pickle()
216
217     def dump_pickle(self):
218         tmp = "%s.tmp" % (self.picklefile,)
219         f = open(tmp, 'wb')
220         pickle.dump(self.gathered_stats, f)
221         f.close()
222         if os.path.exists(self.picklefile):
223             os.unlink(self.picklefile)
224         os.rename(tmp, self.picklefile)
225
226 class GathererApp(object):
227     def __init__(self):
228         d = self.setup_tub()
229         d.addCallback(self._tub_ready)
230
231     def setup_tub(self):
232         self._tub = foolscap.Tub(certFile="stats_gatherer.pem")
233         self._tub.setOption("logLocalFailures", True)
234         self._tub.setOption("logRemoteFailures", True)
235         self._tub.startService()
236         portnumfile = "portnum"
237         try:
238             portnum = int(open(portnumfile, "r").read())
239         except (EnvironmentError, ValueError):
240             portnum = 0
241         self._tub.listenOn("tcp:%d" % portnum)
242         d = defer.maybeDeferred(get_local_ip_for)
243         d.addCallback(self._set_location)
244         d.addCallback(lambda res: self._tub)
245         return d
246
247     def _set_location(self, local_address):
248         if local_address is None:
249             local_addresses = ["127.0.0.1"]
250         else:
251             local_addresses = [local_address, "127.0.0.1"]
252         l = self._tub.getListeners()[0]
253         portnum = l.getPortnum()
254         portnumfile = "portnum"
255         open(portnumfile, "w").write("%d\n" % portnum)
256         local_addresses = [ "%s:%d" % (addr, portnum,)
257                             for addr in local_addresses ]
258         assert len(local_addresses) >= 1
259         location = ",".join(local_addresses)
260         self._tub.setLocation(location)
261
262     def _tub_ready(self, tub):
263         sg = PickleStatsGatherer(tub, ".")
264         sg.setServiceParent(tub)
265         sg.verbose = True
266         print '\nStatsGatherer: %s\n' % (sg.get_furl(),)
267
268 def main(argv):
269     ga = GathererApp()
270     reactor.run()
271
272 if __name__ == '__main__':
273     main(sys.argv)