]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/stats.py
stats: add tests for CPUUsageMonitor, modify it a bit to facilitate testing
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / stats.py
1
2 import os
3 import pickle
4 import pprint
5 import sys
6 import time
7 from collections import deque
8
9 from twisted.internet import reactor, defer
10 from twisted.application import service
11 from twisted.application.internet import TimerService
12 from zope.interface import implements
13 import foolscap
14 from foolscap.eventual import eventually
15 from foolscap.logging.gatherer import get_local_ip_for
16 from twisted.internet.error import ConnectionDone, ConnectionLost
17 from foolscap import DeadReferenceError
18
19 from allmydata.util import log
20 from allmydata.interfaces import RIStatsProvider, RIStatsGatherer, IStatsProducer
21
22 class LoadMonitor(service.MultiService):
23     implements(IStatsProducer)
24
25     loop_interval = 1
26     num_samples = 60
27
28     def __init__(self, provider, warn_if_delay_exceeds=1):
29         service.MultiService.__init__(self)
30         self.provider = provider
31         self.warn_if_delay_exceeds = warn_if_delay_exceeds
32         self.started = False
33         self.last = None
34         self.stats = deque()
35         self.timer = None
36
37     def startService(self):
38         if not self.started:
39             self.started = True
40             self.timer = reactor.callLater(self.loop_interval, self.loop)
41         service.MultiService.startService(self)
42
43     def stopService(self):
44         self.started = False
45         if self.timer:
46             self.timer.cancel()
47             self.timer = None
48         return service.MultiService.stopService(self)
49
50     def loop(self):
51         self.timer = None
52         if not self.started:
53             return
54         now = time.time()
55         if self.last is not None:
56             delay = now - self.last - self.loop_interval
57             if delay > self.warn_if_delay_exceeds:
58                 log.msg(format='excessive reactor delay (%ss)', args=(delay,),
59                         level=log.UNUSUAL)
60             self.stats.append(delay)
61             while len(self.stats) > self.num_samples:
62                 self.stats.popleft()
63
64         self.last = now
65         self.timer = reactor.callLater(self.loop_interval, self.loop)
66
67     def get_stats(self):
68         if self.stats:
69             avg = sum(self.stats) / len(self.stats)
70             m_x = max(self.stats)
71         else:
72             avg = m_x = 0
73         return { 'load_monitor.avg_load': avg,
74                  'load_monitor.max_load': m_x, }
75
76 class CPUUsageMonitor(service.MultiService):
77     implements(IStatsProducer)
78     HISTORY_LENGTH = 15
79     POLL_INTERVAL = 60
80
81     def __init__(self):
82         service.MultiService.__init__(self)
83         # we don't use time.clock() here, because the constructor is run by
84         # the twistd parent process (as it loads the .tac file), whereas the
85         # rest of the program will be run by the child process, after twistd
86         # forks. Instead, set self.initial_cpu as soon as the reactor starts
87         # up.
88         self.initial_cpu = 0.0 # just in case
89         eventually(self._set_initial_cpu)
90         self.samples = []
91         # we provide 1min, 5min, and 15min moving averages
92         TimerService(self.POLL_INTERVAL, self.check).setServiceParent(self)
93
94     def _set_initial_cpu(self):
95         self.initial_cpu = time.clock()
96
97     def check(self):
98         now_wall = time.time()
99         now_cpu = time.clock()
100         self.samples.append( (now_wall, now_cpu) )
101         while len(self.samples) > self.HISTORY_LENGTH+1:
102             self.samples.pop(0)
103
104     def _average_N_minutes(self, size):
105         if len(self.samples) < size+1:
106             return None
107         first = -size-1
108         elapsed_wall = self.samples[-1][0] - self.samples[first][0]
109         elapsed_cpu = self.samples[-1][1] - self.samples[first][1]
110         fraction = elapsed_cpu / elapsed_wall
111         return fraction
112
113     def get_stats(self):
114         s = {}
115         avg = self._average_N_minutes(1)
116         if avg is not None:
117             s["cpu_monitor.1min_avg"] = avg
118         avg = self._average_N_minutes(5)
119         if avg is not None:
120             s["cpu_monitor.5min_avg"] = avg
121         avg = self._average_N_minutes(15)
122         if avg is not None:
123             s["cpu_monitor.15min_avg"] = avg
124         now_cpu = time.clock()
125         s["cpu_monitor.total"] = now_cpu - self.initial_cpu
126         return s
127
128 class StatsProvider(foolscap.Referenceable, service.MultiService):
129     implements(RIStatsProvider)
130
131     def __init__(self, node, gatherer_furl):
132         service.MultiService.__init__(self)
133         self.node = node
134         self.gatherer_furl = gatherer_furl
135
136         self.counters = {}
137         self.stats_producers = []
138
139         self.load_monitor = LoadMonitor(self)
140         self.load_monitor.setServiceParent(self)
141         self.register_producer(self.load_monitor)
142
143         self.cpu_monitor = CPUUsageMonitor()
144         self.cpu_monitor.setServiceParent(self)
145         self.register_producer(self.cpu_monitor)
146
147     def startService(self):
148         if self.node:
149             d = self.node.when_tub_ready()
150             def connect(junk):
151                 nickname = self.node.get_config('nickname')
152                 self.node.tub.connectTo(self.gatherer_furl, self._connected, nickname)
153             d.addCallback(connect)
154         service.MultiService.startService(self)
155
156     def count(self, name, delta=1):
157         val = self.counters.setdefault(name, 0)
158         self.counters[name] = val + delta
159
160     def register_producer(self, stats_producer):
161         self.stats_producers.append(IStatsProducer(stats_producer))
162
163     def get_stats(self):
164         stats = {}
165         for sp in self.stats_producers:
166             stats.update(sp.get_stats())
167         ret = { 'counters': self.counters, 'stats': stats }
168         log.msg(format='get_stats() -> %(stats)s', stats=ret, level=log.NOISY)
169         return ret
170
171     def remote_get_stats(self):
172         return self.get_stats()
173
174     def _connected(self, gatherer, nickname):
175         gatherer.callRemoteOnly('provide', self, nickname or '')
176
177 class StatsGatherer(foolscap.Referenceable, service.MultiService):
178     implements(RIStatsGatherer)
179
180     poll_interval = 60
181
182     def __init__(self, tub, basedir):
183         service.MultiService.__init__(self)
184         self.tub = tub
185         self.basedir = basedir
186
187         self.clients = {}
188         self.nicknames = {}
189
190     def startService(self):
191         # the Tub must have a location set on it by now
192         service.MultiService.startService(self)
193         self.timer = TimerService(self.poll_interval, self.poll)
194         self.timer.setServiceParent(self)
195         self.registerGatherer()
196
197     def get_furl(self):
198         return self.my_furl
199
200     def registerGatherer(self):
201         furl_file = os.path.join(self.basedir, "stats_gatherer.furl")
202         self.my_furl = self.tub.registerReference(self, furlFile=furl_file)
203
204     def get_tubid(self, rref):
205         return foolscap.SturdyRef(rref.tracker.getURL()).getTubRef().getTubID()
206
207     def remote_provide(self, provider, nickname):
208         tubid = self.get_tubid(provider)
209         if tubid == '<unauth>':
210             print "WARNING: failed to get tubid for %s (%s)" % (provider, nickname)
211             # don't add to clients to poll (polluting data) don't care about disconnect
212             return
213         self.clients[tubid] = provider
214         self.nicknames[tubid] = nickname
215
216     def poll(self):
217         for tubid,client in self.clients.items():
218             nickname = self.nicknames.get(tubid)
219             d = client.callRemote('get_stats')
220             d.addCallbacks(self.got_stats, self.lost_client,
221                            callbackArgs=(tubid, nickname),
222                            errbackArgs=(tubid,))
223             d.addErrback(self.log_client_error, tubid)
224
225     def lost_client(self, f, tubid):
226         # this is called lazily, when a get_stats request fails
227         del self.clients[tubid]
228         del self.nicknames[tubid]
229         f.trap(DeadReferenceError, ConnectionDone, ConnectionLost)
230
231     def log_client_error(self, f, tubid):
232         log.msg("StatsGatherer: error in get_stats(), peerid=%s" % tubid,
233                 level=log.UNUSUAL, failure=f)
234
235     def got_stats(self, stats, tubid, nickname):
236         raise NotImplementedError()
237
238 class StdOutStatsGatherer(StatsGatherer):
239     verbose = True
240     def remote_provide(self, provider, nickname):
241         tubid = self.get_tubid(provider)
242         if self.verbose:
243             print 'connect "%s" [%s]' % (nickname, tubid)
244             provider.notifyOnDisconnect(self.announce_lost_client, tubid)
245         StatsGatherer.remote_provide(self, provider, nickname)
246
247     def announce_lost_client(self, tubid):
248         print 'disconnect "%s" [%s]:' % (self.nicknames[tubid], tubid)
249
250     def got_stats(self, stats, tubid, nickname):
251         print '"%s" [%s]:' % (nickname, tubid)
252         pprint.pprint(stats)
253
254 class PickleStatsGatherer(StdOutStatsGatherer):
255     # inherit from StdOutStatsGatherer for connect/disconnect notifications
256
257     def __init__(self, tub, basedir=".", verbose=True):
258         self.verbose = verbose
259         StatsGatherer.__init__(self, tub, basedir)
260         self.picklefile = os.path.join(basedir, "stats.pickle")
261
262         if os.path.exists(self.picklefile):
263             f = open(self.picklefile, 'rb')
264             self.gathered_stats = pickle.load(f)
265             f.close()
266         else:
267             self.gathered_stats = {}
268
269     def got_stats(self, stats, tubid, nickname):
270         s = self.gathered_stats.setdefault(tubid, {})
271         s['timestamp'] = time.time()
272         s['nickname'] = nickname
273         s['stats'] = stats
274         self.dump_pickle()
275
276     def dump_pickle(self):
277         tmp = "%s.tmp" % (self.picklefile,)
278         f = open(tmp, 'wb')
279         pickle.dump(self.gathered_stats, f)
280         f.close()
281         if os.path.exists(self.picklefile):
282             os.unlink(self.picklefile)
283         os.rename(tmp, self.picklefile)
284
285 class GathererApp(object):
286     def __init__(self):
287         d = self.setup_tub()
288         d.addCallback(self._tub_ready)
289
290     def setup_tub(self):
291         self._tub = foolscap.Tub(certFile="stats_gatherer.pem")
292         self._tub.setOption("logLocalFailures", True)
293         self._tub.setOption("logRemoteFailures", True)
294         self._tub.startService()
295         portnumfile = "portnum"
296         try:
297             portnum = int(open(portnumfile, "r").read())
298         except (EnvironmentError, ValueError):
299             portnum = 0
300         self._tub.listenOn("tcp:%d" % portnum)
301         d = defer.maybeDeferred(get_local_ip_for)
302         d.addCallback(self._set_location)
303         d.addCallback(lambda res: self._tub)
304         return d
305
306     def _set_location(self, local_address):
307         if local_address is None:
308             local_addresses = ["127.0.0.1"]
309         else:
310             local_addresses = [local_address, "127.0.0.1"]
311         l = self._tub.getListeners()[0]
312         portnum = l.getPortnum()
313         portnumfile = "portnum"
314         open(portnumfile, "w").write("%d\n" % portnum)
315         local_addresses = [ "%s:%d" % (addr, portnum,)
316                             for addr in local_addresses ]
317         assert len(local_addresses) >= 1
318         location = ",".join(local_addresses)
319         self._tub.setLocation(location)
320
321     def _tub_ready(self, tub):
322         sg = PickleStatsGatherer(tub, ".")
323         sg.setServiceParent(tub)
324         sg.verbose = True
325         print '\nStatsGatherer: %s\n' % (sg.get_furl(),)
326
327 def main(argv):
328     ga = GathererApp()
329     reactor.run()
330
331 if __name__ == '__main__':
332     main(sys.argv)