]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/stats.py
switch to using RemoteException instead of 'wrapped' RemoteReferences. Should fix...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / stats.py
1
2 import os
3 import pickle
4 import pprint
5 import time
6 from collections import deque
7
8 from twisted.internet import reactor
9 from twisted.application import service
10 from twisted.application.internet import TimerService
11 from zope.interface import implements
12 from foolscap.api import eventually, DeadReferenceError, Referenceable, Tub
13 from twisted.internet.error import ConnectionDone, ConnectionLost
14
15 from allmydata.util import log
16 from allmydata.interfaces import RIStatsProvider, RIStatsGatherer, IStatsProducer
17
18 class LoadMonitor(service.MultiService):
19     implements(IStatsProducer)
20
21     loop_interval = 1
22     num_samples = 60
23
24     def __init__(self, provider, warn_if_delay_exceeds=1):
25         service.MultiService.__init__(self)
26         self.provider = provider
27         self.warn_if_delay_exceeds = warn_if_delay_exceeds
28         self.started = False
29         self.last = None
30         self.stats = deque()
31         self.timer = None
32
33     def startService(self):
34         if not self.started:
35             self.started = True
36             self.timer = reactor.callLater(self.loop_interval, self.loop)
37         service.MultiService.startService(self)
38
39     def stopService(self):
40         self.started = False
41         if self.timer:
42             self.timer.cancel()
43             self.timer = None
44         return service.MultiService.stopService(self)
45
46     def loop(self):
47         self.timer = None
48         if not self.started:
49             return
50         now = time.time()
51         if self.last is not None:
52             delay = now - self.last - self.loop_interval
53             if delay > self.warn_if_delay_exceeds:
54                 log.msg(format='excessive reactor delay (%ss)', args=(delay,),
55                         level=log.UNUSUAL)
56             self.stats.append(delay)
57             while len(self.stats) > self.num_samples:
58                 self.stats.popleft()
59
60         self.last = now
61         self.timer = reactor.callLater(self.loop_interval, self.loop)
62
63     def get_stats(self):
64         if self.stats:
65             avg = sum(self.stats) / len(self.stats)
66             m_x = max(self.stats)
67         else:
68             avg = m_x = 0
69         return { 'load_monitor.avg_load': avg,
70                  'load_monitor.max_load': m_x, }
71
72 class CPUUsageMonitor(service.MultiService):
73     implements(IStatsProducer)
74     HISTORY_LENGTH = 15
75     POLL_INTERVAL = 60
76
77     def __init__(self):
78         service.MultiService.__init__(self)
79         # we don't use time.clock() here, because the constructor is run by
80         # the twistd parent process (as it loads the .tac file), whereas the
81         # rest of the program will be run by the child process, after twistd
82         # forks. Instead, set self.initial_cpu as soon as the reactor starts
83         # up.
84         self.initial_cpu = 0.0 # just in case
85         eventually(self._set_initial_cpu)
86         self.samples = []
87         # we provide 1min, 5min, and 15min moving averages
88         TimerService(self.POLL_INTERVAL, self.check).setServiceParent(self)
89
90     def _set_initial_cpu(self):
91         self.initial_cpu = time.clock()
92
93     def check(self):
94         now_wall = time.time()
95         now_cpu = time.clock()
96         self.samples.append( (now_wall, now_cpu) )
97         while len(self.samples) > self.HISTORY_LENGTH+1:
98             self.samples.pop(0)
99
100     def _average_N_minutes(self, size):
101         if len(self.samples) < size+1:
102             return None
103         first = -size-1
104         elapsed_wall = self.samples[-1][0] - self.samples[first][0]
105         elapsed_cpu = self.samples[-1][1] - self.samples[first][1]
106         fraction = elapsed_cpu / elapsed_wall
107         return fraction
108
109     def get_stats(self):
110         s = {}
111         avg = self._average_N_minutes(1)
112         if avg is not None:
113             s["cpu_monitor.1min_avg"] = avg
114         avg = self._average_N_minutes(5)
115         if avg is not None:
116             s["cpu_monitor.5min_avg"] = avg
117         avg = self._average_N_minutes(15)
118         if avg is not None:
119             s["cpu_monitor.15min_avg"] = avg
120         now_cpu = time.clock()
121         s["cpu_monitor.total"] = now_cpu - self.initial_cpu
122         return s
123
124
125 class StatsProvider(Referenceable, service.MultiService):
126     implements(RIStatsProvider)
127
128     def __init__(self, node, gatherer_furl):
129         service.MultiService.__init__(self)
130         self.node = node
131         self.gatherer_furl = gatherer_furl # might be None
132
133         self.counters = {}
134         self.stats_producers = []
135
136         # only run the LoadMonitor (which submits a timer every second) if
137         # there is a gatherer who is going to be paying attention. Our stats
138         # are visible through HTTP even without a gatherer, so run the rest
139         # of the stats (including the once-per-minute CPUUsageMonitor)
140         if gatherer_furl:
141             self.load_monitor = LoadMonitor(self)
142             self.load_monitor.setServiceParent(self)
143             self.register_producer(self.load_monitor)
144
145         self.cpu_monitor = CPUUsageMonitor()
146         self.cpu_monitor.setServiceParent(self)
147         self.register_producer(self.cpu_monitor)
148
149     def startService(self):
150         if self.node and self.gatherer_furl:
151             d = self.node.when_tub_ready()
152             def connect(junk):
153                 nickname_utf8 = self.node.nickname.encode("utf-8")
154                 self.node.tub.connectTo(self.gatherer_furl,
155                                         self._connected, nickname_utf8)
156             d.addCallback(connect)
157         service.MultiService.startService(self)
158
159     def count(self, name, delta=1):
160         val = self.counters.setdefault(name, 0)
161         self.counters[name] = val + delta
162
163     def register_producer(self, stats_producer):
164         self.stats_producers.append(IStatsProducer(stats_producer))
165
166     def get_stats(self):
167         stats = {}
168         for sp in self.stats_producers:
169             stats.update(sp.get_stats())
170         ret = { 'counters': self.counters, 'stats': stats }
171         log.msg(format='get_stats() -> %(stats)s', stats=ret, level=log.NOISY)
172         return ret
173
174     def remote_get_stats(self):
175         return self.get_stats()
176
177     def _connected(self, gatherer, nickname):
178         gatherer.callRemoteOnly('provide', self, nickname or '')
179
180
181 class StatsGatherer(Referenceable, service.MultiService):
182     implements(RIStatsGatherer)
183
184     poll_interval = 60
185
186     def __init__(self, basedir):
187         service.MultiService.__init__(self)
188         self.basedir = basedir
189
190         self.clients = {}
191         self.nicknames = {}
192
193         self.timer = TimerService(self.poll_interval, self.poll)
194         self.timer.setServiceParent(self)
195
196     def get_tubid(self, rref):
197         return rref.getRemoteTubID()
198
199     def remote_provide(self, provider, nickname):
200         tubid = self.get_tubid(provider)
201         if tubid == '<unauth>':
202             print "WARNING: failed to get tubid for %s (%s)" % (provider, nickname)
203             # don't add to clients to poll (polluting data) don't care about disconnect
204             return
205         self.clients[tubid] = provider
206         self.nicknames[tubid] = nickname
207
208     def poll(self):
209         for tubid,client in self.clients.items():
210             nickname = self.nicknames.get(tubid)
211             d = client.callRemote('get_stats')
212             d.addCallbacks(self.got_stats, self.lost_client,
213                            callbackArgs=(tubid, nickname),
214                            errbackArgs=(tubid,))
215             d.addErrback(self.log_client_error, tubid)
216
217     def lost_client(self, f, tubid):
218         # this is called lazily, when a get_stats request fails
219         del self.clients[tubid]
220         del self.nicknames[tubid]
221         f.trap(DeadReferenceError, ConnectionDone, ConnectionLost)
222
223     def log_client_error(self, f, tubid):
224         log.msg("StatsGatherer: error in get_stats(), peerid=%s" % tubid,
225                 level=log.UNUSUAL, failure=f)
226
227     def got_stats(self, stats, tubid, nickname):
228         raise NotImplementedError()
229
230 class StdOutStatsGatherer(StatsGatherer):
231     verbose = True
232     def remote_provide(self, provider, nickname):
233         tubid = self.get_tubid(provider)
234         if self.verbose:
235             print 'connect "%s" [%s]' % (nickname, tubid)
236             provider.notifyOnDisconnect(self.announce_lost_client, tubid)
237         StatsGatherer.remote_provide(self, provider, nickname)
238
239     def announce_lost_client(self, tubid):
240         print 'disconnect "%s" [%s]' % (self.nicknames[tubid], tubid)
241
242     def got_stats(self, stats, tubid, nickname):
243         print '"%s" [%s]:' % (nickname, tubid)
244         pprint.pprint(stats)
245
246 class PickleStatsGatherer(StdOutStatsGatherer):
247     # inherit from StdOutStatsGatherer for connect/disconnect notifications
248
249     def __init__(self, basedir=".", verbose=True):
250         self.verbose = verbose
251         StatsGatherer.__init__(self, basedir)
252         self.picklefile = os.path.join(basedir, "stats.pickle")
253
254         if os.path.exists(self.picklefile):
255             f = open(self.picklefile, 'rb')
256             self.gathered_stats = pickle.load(f)
257             f.close()
258         else:
259             self.gathered_stats = {}
260
261     def got_stats(self, stats, tubid, nickname):
262         s = self.gathered_stats.setdefault(tubid, {})
263         s['timestamp'] = time.time()
264         s['nickname'] = nickname
265         s['stats'] = stats
266         self.dump_pickle()
267
268     def dump_pickle(self):
269         tmp = "%s.tmp" % (self.picklefile,)
270         f = open(tmp, 'wb')
271         pickle.dump(self.gathered_stats, f)
272         f.close()
273         if os.path.exists(self.picklefile):
274             os.unlink(self.picklefile)
275         os.rename(tmp, self.picklefile)
276
277 class StatsGathererService(service.MultiService):
278     furl_file = "stats_gatherer.furl"
279
280     def __init__(self, basedir=".", verbose=False):
281         service.MultiService.__init__(self)
282         self.basedir = basedir
283         self.tub = Tub(certFile=os.path.join(self.basedir,
284                                              "stats_gatherer.pem"))
285         self.tub.setServiceParent(self)
286         self.tub.setOption("logLocalFailures", True)
287         self.tub.setOption("logRemoteFailures", True)
288         self.tub.setOption("expose-remote-exception-types", False)
289
290         self.stats_gatherer = PickleStatsGatherer(self.basedir, verbose)
291         self.stats_gatherer.setServiceParent(self)
292
293         portnumfile = os.path.join(self.basedir, "portnum")
294         try:
295             portnum = open(portnumfile, "r").read()
296         except EnvironmentError:
297             portnum = None
298         self.listener = self.tub.listenOn(portnum or "tcp:0")
299         d = self.tub.setLocationAutomatically()
300         if portnum is None:
301             d.addCallback(self.save_portnum)
302         d.addCallback(self.tub_ready)
303         d.addErrback(log.err)
304
305     def save_portnum(self, junk):
306         portnum = self.listener.getPortnum()
307         portnumfile = os.path.join(self.basedir, 'portnum')
308         open(portnumfile, 'wb').write('%d\n' % (portnum,))
309
310     def tub_ready(self, ignored):
311         ff = os.path.join(self.basedir, self.furl_file)
312         self.gatherer_furl = self.tub.registerReference(self.stats_gatherer,
313                                                         furlFile=ff)