]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/stats.py
If a stats.pickle file cannot be read, print a better error message.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / stats.py
1
2 import os
3 import pickle
4 import pprint
5 import time
6 from collections import deque
7
8 from twisted.internet import reactor
9 from twisted.application import service
10 from twisted.application.internet import TimerService
11 from zope.interface import implements
12 from foolscap.api import eventually, DeadReferenceError, Referenceable, Tub
13
14 from allmydata.util import log
15 from allmydata.util.encodingutil import quote_output
16 from allmydata.interfaces import RIStatsProvider, RIStatsGatherer, IStatsProducer
17
18 class LoadMonitor(service.MultiService):
19     implements(IStatsProducer)
20
21     loop_interval = 1
22     num_samples = 60
23
24     def __init__(self, provider, warn_if_delay_exceeds=1):
25         service.MultiService.__init__(self)
26         self.provider = provider
27         self.warn_if_delay_exceeds = warn_if_delay_exceeds
28         self.started = False
29         self.last = None
30         self.stats = deque()
31         self.timer = None
32
33     def startService(self):
34         if not self.started:
35             self.started = True
36             self.timer = reactor.callLater(self.loop_interval, self.loop)
37         service.MultiService.startService(self)
38
39     def stopService(self):
40         self.started = False
41         if self.timer:
42             self.timer.cancel()
43             self.timer = None
44         return service.MultiService.stopService(self)
45
46     def loop(self):
47         self.timer = None
48         if not self.started:
49             return
50         now = time.time()
51         if self.last is not None:
52             delay = now - self.last - self.loop_interval
53             if delay > self.warn_if_delay_exceeds:
54                 log.msg(format='excessive reactor delay (%ss)', args=(delay,),
55                         level=log.UNUSUAL)
56             self.stats.append(delay)
57             while len(self.stats) > self.num_samples:
58                 self.stats.popleft()
59
60         self.last = now
61         self.timer = reactor.callLater(self.loop_interval, self.loop)
62
63     def get_stats(self):
64         if self.stats:
65             avg = sum(self.stats) / len(self.stats)
66             m_x = max(self.stats)
67         else:
68             avg = m_x = 0
69         return { 'load_monitor.avg_load': avg,
70                  'load_monitor.max_load': m_x, }
71
72 class CPUUsageMonitor(service.MultiService):
73     implements(IStatsProducer)
74     HISTORY_LENGTH = 15
75     POLL_INTERVAL = 60
76
77     def __init__(self):
78         service.MultiService.__init__(self)
79         # we don't use time.clock() here, because the constructor is run by
80         # the twistd parent process (as it loads the .tac file), whereas the
81         # rest of the program will be run by the child process, after twistd
82         # forks. Instead, set self.initial_cpu as soon as the reactor starts
83         # up.
84         self.initial_cpu = 0.0 # just in case
85         eventually(self._set_initial_cpu)
86         self.samples = []
87         # we provide 1min, 5min, and 15min moving averages
88         TimerService(self.POLL_INTERVAL, self.check).setServiceParent(self)
89
90     def _set_initial_cpu(self):
91         self.initial_cpu = time.clock()
92
93     def check(self):
94         now_wall = time.time()
95         now_cpu = time.clock()
96         self.samples.append( (now_wall, now_cpu) )
97         while len(self.samples) > self.HISTORY_LENGTH+1:
98             self.samples.pop(0)
99
100     def _average_N_minutes(self, size):
101         if len(self.samples) < size+1:
102             return None
103         first = -size-1
104         elapsed_wall = self.samples[-1][0] - self.samples[first][0]
105         elapsed_cpu = self.samples[-1][1] - self.samples[first][1]
106         fraction = elapsed_cpu / elapsed_wall
107         return fraction
108
109     def get_stats(self):
110         s = {}
111         avg = self._average_N_minutes(1)
112         if avg is not None:
113             s["cpu_monitor.1min_avg"] = avg
114         avg = self._average_N_minutes(5)
115         if avg is not None:
116             s["cpu_monitor.5min_avg"] = avg
117         avg = self._average_N_minutes(15)
118         if avg is not None:
119             s["cpu_monitor.15min_avg"] = avg
120         now_cpu = time.clock()
121         s["cpu_monitor.total"] = now_cpu - self.initial_cpu
122         return s
123
124
125 class StatsProvider(Referenceable, service.MultiService):
126     implements(RIStatsProvider)
127
128     def __init__(self, node, gatherer_furl):
129         service.MultiService.__init__(self)
130         self.node = node
131         self.gatherer_furl = gatherer_furl # might be None
132
133         self.counters = {}
134         self.stats_producers = []
135
136         # only run the LoadMonitor (which submits a timer every second) if
137         # there is a gatherer who is going to be paying attention. Our stats
138         # are visible through HTTP even without a gatherer, so run the rest
139         # of the stats (including the once-per-minute CPUUsageMonitor)
140         if gatherer_furl:
141             self.load_monitor = LoadMonitor(self)
142             self.load_monitor.setServiceParent(self)
143             self.register_producer(self.load_monitor)
144
145         self.cpu_monitor = CPUUsageMonitor()
146         self.cpu_monitor.setServiceParent(self)
147         self.register_producer(self.cpu_monitor)
148
149     def startService(self):
150         if self.node and self.gatherer_furl:
151             d = self.node.when_tub_ready()
152             def connect(junk):
153                 nickname_utf8 = self.node.nickname.encode("utf-8")
154                 self.node.tub.connectTo(self.gatherer_furl,
155                                         self._connected, nickname_utf8)
156             d.addCallback(connect)
157         service.MultiService.startService(self)
158
159     def count(self, name, delta=1):
160         val = self.counters.setdefault(name, 0)
161         self.counters[name] = val + delta
162
163     def register_producer(self, stats_producer):
164         self.stats_producers.append(IStatsProducer(stats_producer))
165
166     def get_stats(self):
167         stats = {}
168         for sp in self.stats_producers:
169             stats.update(sp.get_stats())
170         ret = { 'counters': self.counters, 'stats': stats }
171         log.msg(format='get_stats() -> %(stats)s', stats=ret, level=log.NOISY)
172         return ret
173
174     def remote_get_stats(self):
175         return self.get_stats()
176
177     def _connected(self, gatherer, nickname):
178         gatherer.callRemoteOnly('provide', self, nickname or '')
179
180
181 class StatsGatherer(Referenceable, service.MultiService):
182     implements(RIStatsGatherer)
183
184     poll_interval = 60
185
186     def __init__(self, basedir):
187         service.MultiService.__init__(self)
188         self.basedir = basedir
189
190         self.clients = {}
191         self.nicknames = {}
192
193         self.timer = TimerService(self.poll_interval, self.poll)
194         self.timer.setServiceParent(self)
195
196     def get_tubid(self, rref):
197         return rref.getRemoteTubID()
198
199     def remote_provide(self, provider, nickname):
200         tubid = self.get_tubid(provider)
201         if tubid == '<unauth>':
202             print "WARNING: failed to get tubid for %s (%s)" % (provider, nickname)
203             # don't add to clients to poll (polluting data) don't care about disconnect
204             return
205         self.clients[tubid] = provider
206         self.nicknames[tubid] = nickname
207
208     def poll(self):
209         for tubid,client in self.clients.items():
210             nickname = self.nicknames.get(tubid)
211             d = client.callRemote('get_stats')
212             d.addCallbacks(self.got_stats, self.lost_client,
213                            callbackArgs=(tubid, nickname),
214                            errbackArgs=(tubid,))
215             d.addErrback(self.log_client_error, tubid)
216
217     def lost_client(self, f, tubid):
218         # this is called lazily, when a get_stats request fails
219         del self.clients[tubid]
220         del self.nicknames[tubid]
221         f.trap(DeadReferenceError)
222
223     def log_client_error(self, f, tubid):
224         log.msg("StatsGatherer: error in get_stats(), peerid=%s" % tubid,
225                 level=log.UNUSUAL, failure=f)
226
227     def got_stats(self, stats, tubid, nickname):
228         raise NotImplementedError()
229
230 class StdOutStatsGatherer(StatsGatherer):
231     verbose = True
232     def remote_provide(self, provider, nickname):
233         tubid = self.get_tubid(provider)
234         if self.verbose:
235             print 'connect "%s" [%s]' % (nickname, tubid)
236             provider.notifyOnDisconnect(self.announce_lost_client, tubid)
237         StatsGatherer.remote_provide(self, provider, nickname)
238
239     def announce_lost_client(self, tubid):
240         print 'disconnect "%s" [%s]' % (self.nicknames[tubid], tubid)
241
242     def got_stats(self, stats, tubid, nickname):
243         print '"%s" [%s]:' % (nickname, tubid)
244         pprint.pprint(stats)
245
246 class PickleStatsGatherer(StdOutStatsGatherer):
247     # inherit from StdOutStatsGatherer for connect/disconnect notifications
248
249     def __init__(self, basedir=".", verbose=True):
250         self.verbose = verbose
251         StatsGatherer.__init__(self, basedir)
252         self.picklefile = os.path.join(basedir, "stats.pickle")
253
254         if os.path.exists(self.picklefile):
255             f = open(self.picklefile, 'rb')
256             try:
257                 self.gathered_stats = pickle.load(f)
258             except Exception:
259                 print ("Error while attempting to load pickle file %s.\nYou may need to delete this file.\n" %
260                        quote_output(os.path.abspath(self.picklefile)))
261                 raise
262             f.close()
263         else:
264             self.gathered_stats = {}
265
266     def got_stats(self, stats, tubid, nickname):
267         s = self.gathered_stats.setdefault(tubid, {})
268         s['timestamp'] = time.time()
269         s['nickname'] = nickname
270         s['stats'] = stats
271         self.dump_pickle()
272
273     def dump_pickle(self):
274         tmp = "%s.tmp" % (self.picklefile,)
275         f = open(tmp, 'wb')
276         pickle.dump(self.gathered_stats, f)
277         f.close()
278         if os.path.exists(self.picklefile):
279             os.unlink(self.picklefile)
280         os.rename(tmp, self.picklefile)
281
282 class StatsGathererService(service.MultiService):
283     furl_file = "stats_gatherer.furl"
284
285     def __init__(self, basedir=".", verbose=False):
286         service.MultiService.__init__(self)
287         self.basedir = basedir
288         self.tub = Tub(certFile=os.path.join(self.basedir,
289                                              "stats_gatherer.pem"))
290         self.tub.setServiceParent(self)
291         self.tub.setOption("logLocalFailures", True)
292         self.tub.setOption("logRemoteFailures", True)
293         self.tub.setOption("expose-remote-exception-types", False)
294
295         self.stats_gatherer = PickleStatsGatherer(self.basedir, verbose)
296         self.stats_gatherer.setServiceParent(self)
297
298         portnumfile = os.path.join(self.basedir, "portnum")
299         try:
300             portnum = open(portnumfile, "r").read()
301         except EnvironmentError:
302             portnum = None
303         self.listener = self.tub.listenOn(portnum or "tcp:0")
304         d = self.tub.setLocationAutomatically()
305         if portnum is None:
306             d.addCallback(self.save_portnum)
307         d.addCallback(self.tub_ready)
308         d.addErrback(log.err)
309
310     def save_portnum(self, junk):
311         portnum = self.listener.getPortnum()
312         portnumfile = os.path.join(self.basedir, 'portnum')
313         open(portnumfile, 'wb').write('%d\n' % (portnum,))
314
315     def tub_ready(self, ignored):
316         ff = os.path.join(self.basedir, self.furl_file)
317         self.gatherer_furl = self.tub.registerReference(self.stats_gatherer,
318                                                         furlFile=ff)