]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/stats.py
5c1cdb3d87b8b8fb034cea38acefc8587aae264c
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / stats.py
1
2 import os
3 import pickle
4 import pprint
5 import time
6 from collections import deque
7
8 from twisted.internet import reactor
9 from twisted.application import service
10 from twisted.application.internet import TimerService
11 from zope.interface import implements
12 from foolscap.api import eventually, DeadReferenceError, Referenceable, Tub
13
14 from allmydata.util import log
15 from allmydata.interfaces import RIStatsProvider, RIStatsGatherer, IStatsProducer
16
17 class LoadMonitor(service.MultiService):
18     implements(IStatsProducer)
19
20     loop_interval = 1
21     num_samples = 60
22
23     def __init__(self, provider, warn_if_delay_exceeds=1):
24         service.MultiService.__init__(self)
25         self.provider = provider
26         self.warn_if_delay_exceeds = warn_if_delay_exceeds
27         self.started = False
28         self.last = None
29         self.stats = deque()
30         self.timer = None
31
32     def startService(self):
33         if not self.started:
34             self.started = True
35             self.timer = reactor.callLater(self.loop_interval, self.loop)
36         service.MultiService.startService(self)
37
38     def stopService(self):
39         self.started = False
40         if self.timer:
41             self.timer.cancel()
42             self.timer = None
43         return service.MultiService.stopService(self)
44
45     def loop(self):
46         self.timer = None
47         if not self.started:
48             return
49         now = time.time()
50         if self.last is not None:
51             delay = now - self.last - self.loop_interval
52             if delay > self.warn_if_delay_exceeds:
53                 log.msg(format='excessive reactor delay (%ss)', args=(delay,),
54                         level=log.UNUSUAL)
55             self.stats.append(delay)
56             while len(self.stats) > self.num_samples:
57                 self.stats.popleft()
58
59         self.last = now
60         self.timer = reactor.callLater(self.loop_interval, self.loop)
61
62     def get_stats(self):
63         if self.stats:
64             avg = sum(self.stats) / len(self.stats)
65             m_x = max(self.stats)
66         else:
67             avg = m_x = 0
68         return { 'load_monitor.avg_load': avg,
69                  'load_monitor.max_load': m_x, }
70
71 class CPUUsageMonitor(service.MultiService):
72     implements(IStatsProducer)
73     HISTORY_LENGTH = 15
74     POLL_INTERVAL = 60
75
76     def __init__(self):
77         service.MultiService.__init__(self)
78         # we don't use time.clock() here, because the constructor is run by
79         # the twistd parent process (as it loads the .tac file), whereas the
80         # rest of the program will be run by the child process, after twistd
81         # forks. Instead, set self.initial_cpu as soon as the reactor starts
82         # up.
83         self.initial_cpu = 0.0 # just in case
84         eventually(self._set_initial_cpu)
85         self.samples = []
86         # we provide 1min, 5min, and 15min moving averages
87         TimerService(self.POLL_INTERVAL, self.check).setServiceParent(self)
88
89     def _set_initial_cpu(self):
90         self.initial_cpu = time.clock()
91
92     def check(self):
93         now_wall = time.time()
94         now_cpu = time.clock()
95         self.samples.append( (now_wall, now_cpu) )
96         while len(self.samples) > self.HISTORY_LENGTH+1:
97             self.samples.pop(0)
98
99     def _average_N_minutes(self, size):
100         if len(self.samples) < size+1:
101             return None
102         first = -size-1
103         elapsed_wall = self.samples[-1][0] - self.samples[first][0]
104         elapsed_cpu = self.samples[-1][1] - self.samples[first][1]
105         fraction = elapsed_cpu / elapsed_wall
106         return fraction
107
108     def get_stats(self):
109         s = {}
110         avg = self._average_N_minutes(1)
111         if avg is not None:
112             s["cpu_monitor.1min_avg"] = avg
113         avg = self._average_N_minutes(5)
114         if avg is not None:
115             s["cpu_monitor.5min_avg"] = avg
116         avg = self._average_N_minutes(15)
117         if avg is not None:
118             s["cpu_monitor.15min_avg"] = avg
119         now_cpu = time.clock()
120         s["cpu_monitor.total"] = now_cpu - self.initial_cpu
121         return s
122
123
124 class StatsProvider(Referenceable, service.MultiService):
125     implements(RIStatsProvider)
126
127     def __init__(self, node, gatherer_furl):
128         service.MultiService.__init__(self)
129         self.node = node
130         self.gatherer_furl = gatherer_furl # might be None
131
132         self.counters = {}
133         self.stats_producers = []
134
135         # only run the LoadMonitor (which submits a timer every second) if
136         # there is a gatherer who is going to be paying attention. Our stats
137         # are visible through HTTP even without a gatherer, so run the rest
138         # of the stats (including the once-per-minute CPUUsageMonitor)
139         if gatherer_furl:
140             self.load_monitor = LoadMonitor(self)
141             self.load_monitor.setServiceParent(self)
142             self.register_producer(self.load_monitor)
143
144         self.cpu_monitor = CPUUsageMonitor()
145         self.cpu_monitor.setServiceParent(self)
146         self.register_producer(self.cpu_monitor)
147
148     def startService(self):
149         if self.node and self.gatherer_furl:
150             d = self.node.when_tub_ready()
151             def connect(junk):
152                 nickname_utf8 = self.node.nickname.encode("utf-8")
153                 self.node.tub.connectTo(self.gatherer_furl,
154                                         self._connected, nickname_utf8)
155             d.addCallback(connect)
156         service.MultiService.startService(self)
157
158     def count(self, name, delta=1):
159         val = self.counters.setdefault(name, 0)
160         self.counters[name] = val + delta
161
162     def register_producer(self, stats_producer):
163         self.stats_producers.append(IStatsProducer(stats_producer))
164
165     def get_stats(self):
166         stats = {}
167         for sp in self.stats_producers:
168             stats.update(sp.get_stats())
169         ret = { 'counters': self.counters, 'stats': stats }
170         log.msg(format='get_stats() -> %(stats)s', stats=ret, level=log.NOISY)
171         return ret
172
173     def remote_get_stats(self):
174         return self.get_stats()
175
176     def _connected(self, gatherer, nickname):
177         gatherer.callRemoteOnly('provide', self, nickname or '')
178
179
180 class StatsGatherer(Referenceable, service.MultiService):
181     implements(RIStatsGatherer)
182
183     poll_interval = 60
184
185     def __init__(self, basedir):
186         service.MultiService.__init__(self)
187         self.basedir = basedir
188
189         self.clients = {}
190         self.nicknames = {}
191
192         self.timer = TimerService(self.poll_interval, self.poll)
193         self.timer.setServiceParent(self)
194
195     def get_tubid(self, rref):
196         return rref.getRemoteTubID()
197
198     def remote_provide(self, provider, nickname):
199         tubid = self.get_tubid(provider)
200         if tubid == '<unauth>':
201             print "WARNING: failed to get tubid for %s (%s)" % (provider, nickname)
202             # don't add to clients to poll (polluting data) don't care about disconnect
203             return
204         self.clients[tubid] = provider
205         self.nicknames[tubid] = nickname
206
207     def poll(self):
208         for tubid,client in self.clients.items():
209             nickname = self.nicknames.get(tubid)
210             d = client.callRemote('get_stats')
211             d.addCallbacks(self.got_stats, self.lost_client,
212                            callbackArgs=(tubid, nickname),
213                            errbackArgs=(tubid,))
214             d.addErrback(self.log_client_error, tubid)
215
216     def lost_client(self, f, tubid):
217         # this is called lazily, when a get_stats request fails
218         del self.clients[tubid]
219         del self.nicknames[tubid]
220         f.trap(DeadReferenceError)
221
222     def log_client_error(self, f, tubid):
223         log.msg("StatsGatherer: error in get_stats(), peerid=%s" % tubid,
224                 level=log.UNUSUAL, failure=f)
225
226     def got_stats(self, stats, tubid, nickname):
227         raise NotImplementedError()
228
229 class StdOutStatsGatherer(StatsGatherer):
230     verbose = True
231     def remote_provide(self, provider, nickname):
232         tubid = self.get_tubid(provider)
233         if self.verbose:
234             print 'connect "%s" [%s]' % (nickname, tubid)
235             provider.notifyOnDisconnect(self.announce_lost_client, tubid)
236         StatsGatherer.remote_provide(self, provider, nickname)
237
238     def announce_lost_client(self, tubid):
239         print 'disconnect "%s" [%s]' % (self.nicknames[tubid], tubid)
240
241     def got_stats(self, stats, tubid, nickname):
242         print '"%s" [%s]:' % (nickname, tubid)
243         pprint.pprint(stats)
244
245 class PickleStatsGatherer(StdOutStatsGatherer):
246     # inherit from StdOutStatsGatherer for connect/disconnect notifications
247
248     def __init__(self, basedir=".", verbose=True):
249         self.verbose = verbose
250         StatsGatherer.__init__(self, basedir)
251         self.picklefile = os.path.join(basedir, "stats.pickle")
252
253         if os.path.exists(self.picklefile):
254             f = open(self.picklefile, 'rb')
255             self.gathered_stats = pickle.load(f)
256             f.close()
257         else:
258             self.gathered_stats = {}
259
260     def got_stats(self, stats, tubid, nickname):
261         s = self.gathered_stats.setdefault(tubid, {})
262         s['timestamp'] = time.time()
263         s['nickname'] = nickname
264         s['stats'] = stats
265         self.dump_pickle()
266
267     def dump_pickle(self):
268         tmp = "%s.tmp" % (self.picklefile,)
269         f = open(tmp, 'wb')
270         pickle.dump(self.gathered_stats, f)
271         f.close()
272         if os.path.exists(self.picklefile):
273             os.unlink(self.picklefile)
274         os.rename(tmp, self.picklefile)
275
276 class StatsGathererService(service.MultiService):
277     furl_file = "stats_gatherer.furl"
278
279     def __init__(self, basedir=".", verbose=False):
280         service.MultiService.__init__(self)
281         self.basedir = basedir
282         self.tub = Tub(certFile=os.path.join(self.basedir,
283                                              "stats_gatherer.pem"))
284         self.tub.setServiceParent(self)
285         self.tub.setOption("logLocalFailures", True)
286         self.tub.setOption("logRemoteFailures", True)
287         self.tub.setOption("expose-remote-exception-types", False)
288
289         self.stats_gatherer = PickleStatsGatherer(self.basedir, verbose)
290         self.stats_gatherer.setServiceParent(self)
291
292         portnumfile = os.path.join(self.basedir, "portnum")
293         try:
294             portnum = open(portnumfile, "r").read()
295         except EnvironmentError:
296             portnum = None
297         self.listener = self.tub.listenOn(portnum or "tcp:0")
298         d = self.tub.setLocationAutomatically()
299         if portnum is None:
300             d.addCallback(self.save_portnum)
301         d.addCallback(self.tub_ready)
302         d.addErrback(log.err)
303
304     def save_portnum(self, junk):
305         portnum = self.listener.getPortnum()
306         portnumfile = os.path.join(self.basedir, 'portnum')
307         open(portnumfile, 'wb').write('%d\n' % (portnum,))
308
309     def tub_ready(self, ignored):
310         ff = os.path.join(self.basedir, self.furl_file)
311         self.gatherer_furl = self.tub.registerReference(self.stats_gatherer,
312                                                         furlFile=ff)