]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/stats.py
#518: replace various BASEDIR/* config files with a single BASEDIR/tahoe.cfg, with...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / stats.py
1
2 import os
3 import pickle
4 import pprint
5 import sys
6 import time
7 from collections import deque
8
9 from twisted.internet import reactor, defer
10 from twisted.application import service
11 from twisted.application.internet import TimerService
12 from zope.interface import implements
13 import foolscap
14 from foolscap.eventual import eventually
15 from foolscap.logging.gatherer import get_local_ip_for
16 from twisted.internet.error import ConnectionDone, ConnectionLost
17 from foolscap import DeadReferenceError
18
19 from allmydata.util import log
20 from allmydata.interfaces import RIStatsProvider, RIStatsGatherer, IStatsProducer
21
22 class LoadMonitor(service.MultiService):
23     implements(IStatsProducer)
24
25     loop_interval = 1
26     num_samples = 60
27
28     def __init__(self, provider, warn_if_delay_exceeds=1):
29         service.MultiService.__init__(self)
30         self.provider = provider
31         self.warn_if_delay_exceeds = warn_if_delay_exceeds
32         self.started = False
33         self.last = None
34         self.stats = deque()
35         self.timer = None
36
37     def startService(self):
38         if not self.started:
39             self.started = True
40             self.timer = reactor.callLater(self.loop_interval, self.loop)
41         service.MultiService.startService(self)
42
43     def stopService(self):
44         self.started = False
45         if self.timer:
46             self.timer.cancel()
47             self.timer = None
48         return service.MultiService.stopService(self)
49
50     def loop(self):
51         self.timer = None
52         if not self.started:
53             return
54         now = time.time()
55         if self.last is not None:
56             delay = now - self.last - self.loop_interval
57             if delay > self.warn_if_delay_exceeds:
58                 log.msg(format='excessive reactor delay (%ss)', args=(delay,),
59                         level=log.UNUSUAL)
60             self.stats.append(delay)
61             while len(self.stats) > self.num_samples:
62                 self.stats.popleft()
63
64         self.last = now
65         self.timer = reactor.callLater(self.loop_interval, self.loop)
66
67     def get_stats(self):
68         if self.stats:
69             avg = sum(self.stats) / len(self.stats)
70             m_x = max(self.stats)
71         else:
72             avg = m_x = 0
73         return { 'load_monitor.avg_load': avg,
74                  'load_monitor.max_load': m_x, }
75
76 class CPUUsageMonitor(service.MultiService):
77     implements(IStatsProducer)
78     HISTORY_LENGTH = 15
79     POLL_INTERVAL = 60
80
81     def __init__(self):
82         service.MultiService.__init__(self)
83         # we don't use time.clock() here, because the constructor is run by
84         # the twistd parent process (as it loads the .tac file), whereas the
85         # rest of the program will be run by the child process, after twistd
86         # forks. Instead, set self.initial_cpu as soon as the reactor starts
87         # up.
88         self.initial_cpu = 0.0 # just in case
89         eventually(self._set_initial_cpu)
90         self.samples = []
91         # we provide 1min, 5min, and 15min moving averages
92         TimerService(self.POLL_INTERVAL, self.check).setServiceParent(self)
93
94     def _set_initial_cpu(self):
95         self.initial_cpu = time.clock()
96
97     def check(self):
98         now_wall = time.time()
99         now_cpu = time.clock()
100         self.samples.append( (now_wall, now_cpu) )
101         while len(self.samples) > self.HISTORY_LENGTH+1:
102             self.samples.pop(0)
103
104     def _average_N_minutes(self, size):
105         if len(self.samples) < size+1:
106             return None
107         first = -size-1
108         elapsed_wall = self.samples[-1][0] - self.samples[first][0]
109         elapsed_cpu = self.samples[-1][1] - self.samples[first][1]
110         fraction = elapsed_cpu / elapsed_wall
111         return fraction
112
113     def get_stats(self):
114         s = {}
115         avg = self._average_N_minutes(1)
116         if avg is not None:
117             s["cpu_monitor.1min_avg"] = avg
118         avg = self._average_N_minutes(5)
119         if avg is not None:
120             s["cpu_monitor.5min_avg"] = avg
121         avg = self._average_N_minutes(15)
122         if avg is not None:
123             s["cpu_monitor.15min_avg"] = avg
124         now_cpu = time.clock()
125         s["cpu_monitor.total"] = now_cpu - self.initial_cpu
126         return s
127
128 class StatsProvider(foolscap.Referenceable, service.MultiService):
129     implements(RIStatsProvider)
130
131     def __init__(self, node, gatherer_furl):
132         service.MultiService.__init__(self)
133         self.node = node
134         self.gatherer_furl = gatherer_furl # might be None
135
136         self.counters = {}
137         self.stats_producers = []
138
139         # only run the LoadMonitor (which submits a timer every second) if
140         # there is a gatherer who is going to be paying attention. Our stats
141         # are visible through HTTP even without a gatherer, so run the rest
142         # of the stats (including the once-per-minute CPUUsageMonitor)
143         if gatherer_furl:
144             self.load_monitor = LoadMonitor(self)
145             self.load_monitor.setServiceParent(self)
146             self.register_producer(self.load_monitor)
147
148         self.cpu_monitor = CPUUsageMonitor()
149         self.cpu_monitor.setServiceParent(self)
150         self.register_producer(self.cpu_monitor)
151
152     def startService(self):
153         if self.node and self.gatherer_furl:
154             d = self.node.when_tub_ready()
155             def connect(junk):
156                 nickname_utf8 = self.node.nickname.encode("utf-8")
157                 self.node.tub.connectTo(self.gatherer_furl,
158                                         self._connected, nickname_utf8)
159             d.addCallback(connect)
160         service.MultiService.startService(self)
161
162     def count(self, name, delta=1):
163         val = self.counters.setdefault(name, 0)
164         self.counters[name] = val + delta
165
166     def register_producer(self, stats_producer):
167         self.stats_producers.append(IStatsProducer(stats_producer))
168
169     def get_stats(self):
170         stats = {}
171         for sp in self.stats_producers:
172             stats.update(sp.get_stats())
173         ret = { 'counters': self.counters, 'stats': stats }
174         log.msg(format='get_stats() -> %(stats)s', stats=ret, level=log.NOISY)
175         return ret
176
177     def remote_get_stats(self):
178         return self.get_stats()
179
180     def _connected(self, gatherer, nickname):
181         gatherer.callRemoteOnly('provide', self, nickname or '')
182
183 class StatsGatherer(foolscap.Referenceable, service.MultiService):
184     implements(RIStatsGatherer)
185
186     poll_interval = 60
187
188     def __init__(self, tub, basedir):
189         service.MultiService.__init__(self)
190         self.tub = tub
191         self.basedir = basedir
192
193         self.clients = {}
194         self.nicknames = {}
195
196     def startService(self):
197         # the Tub must have a location set on it by now
198         service.MultiService.startService(self)
199         self.timer = TimerService(self.poll_interval, self.poll)
200         self.timer.setServiceParent(self)
201         self.registerGatherer()
202
203     def get_furl(self):
204         return self.my_furl
205
206     def registerGatherer(self):
207         furl_file = os.path.join(self.basedir, "stats_gatherer.furl")
208         self.my_furl = self.tub.registerReference(self, furlFile=furl_file)
209
210     def get_tubid(self, rref):
211         return foolscap.SturdyRef(rref.tracker.getURL()).getTubRef().getTubID()
212
213     def remote_provide(self, provider, nickname):
214         tubid = self.get_tubid(provider)
215         if tubid == '<unauth>':
216             print "WARNING: failed to get tubid for %s (%s)" % (provider, nickname)
217             # don't add to clients to poll (polluting data) don't care about disconnect
218             return
219         self.clients[tubid] = provider
220         self.nicknames[tubid] = nickname
221
222     def poll(self):
223         for tubid,client in self.clients.items():
224             nickname = self.nicknames.get(tubid)
225             d = client.callRemote('get_stats')
226             d.addCallbacks(self.got_stats, self.lost_client,
227                            callbackArgs=(tubid, nickname),
228                            errbackArgs=(tubid,))
229             d.addErrback(self.log_client_error, tubid)
230
231     def lost_client(self, f, tubid):
232         # this is called lazily, when a get_stats request fails
233         del self.clients[tubid]
234         del self.nicknames[tubid]
235         f.trap(DeadReferenceError, ConnectionDone, ConnectionLost)
236
237     def log_client_error(self, f, tubid):
238         log.msg("StatsGatherer: error in get_stats(), peerid=%s" % tubid,
239                 level=log.UNUSUAL, failure=f)
240
241     def got_stats(self, stats, tubid, nickname):
242         raise NotImplementedError()
243
244 class StdOutStatsGatherer(StatsGatherer):
245     verbose = True
246     def remote_provide(self, provider, nickname):
247         tubid = self.get_tubid(provider)
248         if self.verbose:
249             print 'connect "%s" [%s]' % (nickname, tubid)
250             provider.notifyOnDisconnect(self.announce_lost_client, tubid)
251         StatsGatherer.remote_provide(self, provider, nickname)
252
253     def announce_lost_client(self, tubid):
254         print 'disconnect "%s" [%s]:' % (self.nicknames[tubid], tubid)
255
256     def got_stats(self, stats, tubid, nickname):
257         print '"%s" [%s]:' % (nickname, tubid)
258         pprint.pprint(stats)
259
260 class PickleStatsGatherer(StdOutStatsGatherer):
261     # inherit from StdOutStatsGatherer for connect/disconnect notifications
262
263     def __init__(self, tub, basedir=".", verbose=True):
264         self.verbose = verbose
265         StatsGatherer.__init__(self, tub, basedir)
266         self.picklefile = os.path.join(basedir, "stats.pickle")
267
268         if os.path.exists(self.picklefile):
269             f = open(self.picklefile, 'rb')
270             self.gathered_stats = pickle.load(f)
271             f.close()
272         else:
273             self.gathered_stats = {}
274
275     def got_stats(self, stats, tubid, nickname):
276         s = self.gathered_stats.setdefault(tubid, {})
277         s['timestamp'] = time.time()
278         s['nickname'] = nickname
279         s['stats'] = stats
280         self.dump_pickle()
281
282     def dump_pickle(self):
283         tmp = "%s.tmp" % (self.picklefile,)
284         f = open(tmp, 'wb')
285         pickle.dump(self.gathered_stats, f)
286         f.close()
287         if os.path.exists(self.picklefile):
288             os.unlink(self.picklefile)
289         os.rename(tmp, self.picklefile)
290
291 class GathererApp(object):
292     def __init__(self):
293         d = self.setup_tub()
294         d.addCallback(self._tub_ready)
295
296     def setup_tub(self):
297         self._tub = foolscap.Tub(certFile="stats_gatherer.pem")
298         self._tub.setOption("logLocalFailures", True)
299         self._tub.setOption("logRemoteFailures", True)
300         self._tub.startService()
301         portnumfile = "portnum"
302         try:
303             portnum = int(open(portnumfile, "r").read())
304         except (EnvironmentError, ValueError):
305             portnum = 0
306         self._tub.listenOn("tcp:%d" % portnum)
307         d = defer.maybeDeferred(get_local_ip_for)
308         d.addCallback(self._set_location)
309         d.addCallback(lambda res: self._tub)
310         return d
311
312     def _set_location(self, local_address):
313         if local_address is None:
314             local_addresses = ["127.0.0.1"]
315         else:
316             local_addresses = [local_address, "127.0.0.1"]
317         l = self._tub.getListeners()[0]
318         portnum = l.getPortnum()
319         portnumfile = "portnum"
320         open(portnumfile, "w").write("%d\n" % portnum)
321         local_addresses = [ "%s:%d" % (addr, portnum,)
322                             for addr in local_addresses ]
323         assert len(local_addresses) >= 1
324         location = ",".join(local_addresses)
325         self._tub.setLocation(location)
326
327     def _tub_ready(self, tub):
328         sg = PickleStatsGatherer(tub, ".")
329         sg.setServiceParent(tub)
330         sg.verbose = True
331         print '\nStatsGatherer: %s\n' % (sg.get_furl(),)
332
333 def main(argv):
334     ga = GathererApp()
335     reactor.run()
336
337 if __name__ == '__main__':
338     main(sys.argv)