6 from collections import deque
8 from twisted.internet import reactor
9 from twisted.application import service
10 from twisted.application.internet import TimerService
11 from zope.interface import implements
12 from foolscap.api import eventually, DeadReferenceError, Referenceable, Tub
14 from allmydata.util import log
15 from allmydata.util.encodingutil import quote_output
16 from allmydata.interfaces import RIStatsProvider, RIStatsGatherer, IStatsProducer
18 class LoadMonitor(service.MultiService):
19 implements(IStatsProducer)
24 def __init__(self, provider, warn_if_delay_exceeds=1):
25 service.MultiService.__init__(self)
26 self.provider = provider
27 self.warn_if_delay_exceeds = warn_if_delay_exceeds
33 def startService(self):
36 self.timer = reactor.callLater(self.loop_interval, self.loop)
37 service.MultiService.startService(self)
39 def stopService(self):
44 return service.MultiService.stopService(self)
51 if self.last is not None:
52 delay = now - self.last - self.loop_interval
53 if delay > self.warn_if_delay_exceeds:
54 log.msg(format='excessive reactor delay (%ss)', args=(delay,),
56 self.stats.append(delay)
57 while len(self.stats) > self.num_samples:
61 self.timer = reactor.callLater(self.loop_interval, self.loop)
65 avg = sum(self.stats) / len(self.stats)
69 return { 'load_monitor.avg_load': avg,
70 'load_monitor.max_load': m_x, }
72 class CPUUsageMonitor(service.MultiService):
73 implements(IStatsProducer)
78 service.MultiService.__init__(self)
79 # we don't use time.clock() here, because the constructor is run by
80 # the twistd parent process (as it loads the .tac file), whereas the
81 # rest of the program will be run by the child process, after twistd
82 # forks. Instead, set self.initial_cpu as soon as the reactor starts
84 self.initial_cpu = 0.0 # just in case
85 eventually(self._set_initial_cpu)
87 # we provide 1min, 5min, and 15min moving averages
88 TimerService(self.POLL_INTERVAL, self.check).setServiceParent(self)
90 def _set_initial_cpu(self):
91 self.initial_cpu = time.clock()
94 now_wall = time.time()
95 now_cpu = time.clock()
96 self.samples.append( (now_wall, now_cpu) )
97 while len(self.samples) > self.HISTORY_LENGTH+1:
100 def _average_N_minutes(self, size):
101 if len(self.samples) < size+1:
104 elapsed_wall = self.samples[-1][0] - self.samples[first][0]
105 elapsed_cpu = self.samples[-1][1] - self.samples[first][1]
106 fraction = elapsed_cpu / elapsed_wall
111 avg = self._average_N_minutes(1)
113 s["cpu_monitor.1min_avg"] = avg
114 avg = self._average_N_minutes(5)
116 s["cpu_monitor.5min_avg"] = avg
117 avg = self._average_N_minutes(15)
119 s["cpu_monitor.15min_avg"] = avg
120 now_cpu = time.clock()
121 s["cpu_monitor.total"] = now_cpu - self.initial_cpu
125 class StatsProvider(Referenceable, service.MultiService):
126 implements(RIStatsProvider)
128 def __init__(self, node, gatherer_furl):
129 service.MultiService.__init__(self)
131 self.gatherer_furl = gatherer_furl # might be None
134 self.stats_producers = []
136 # only run the LoadMonitor (which submits a timer every second) if
137 # there is a gatherer who is going to be paying attention. Our stats
138 # are visible through HTTP even without a gatherer, so run the rest
139 # of the stats (including the once-per-minute CPUUsageMonitor)
141 self.load_monitor = LoadMonitor(self)
142 self.load_monitor.setServiceParent(self)
143 self.register_producer(self.load_monitor)
145 self.cpu_monitor = CPUUsageMonitor()
146 self.cpu_monitor.setServiceParent(self)
147 self.register_producer(self.cpu_monitor)
149 def startService(self):
150 if self.node and self.gatherer_furl:
151 d = self.node.when_tub_ready()
153 nickname_utf8 = self.node.nickname.encode("utf-8")
154 self.node.tub.connectTo(self.gatherer_furl,
155 self._connected, nickname_utf8)
156 d.addCallback(connect)
157 service.MultiService.startService(self)
159 def count(self, name, delta=1):
160 val = self.counters.setdefault(name, 0)
161 self.counters[name] = val + delta
163 def register_producer(self, stats_producer):
164 self.stats_producers.append(IStatsProducer(stats_producer))
168 for sp in self.stats_producers:
169 stats.update(sp.get_stats())
170 ret = { 'counters': self.counters, 'stats': stats }
171 log.msg(format='get_stats() -> %(stats)s', stats=ret, level=log.NOISY)
174 def remote_get_stats(self):
175 return self.get_stats()
177 def _connected(self, gatherer, nickname):
178 gatherer.callRemoteOnly('provide', self, nickname or '')
181 class StatsGatherer(Referenceable, service.MultiService):
182 implements(RIStatsGatherer)
186 def __init__(self, basedir):
187 service.MultiService.__init__(self)
188 self.basedir = basedir
193 self.timer = TimerService(self.poll_interval, self.poll)
194 self.timer.setServiceParent(self)
196 def get_tubid(self, rref):
197 return rref.getRemoteTubID()
199 def remote_provide(self, provider, nickname):
200 tubid = self.get_tubid(provider)
201 if tubid == '<unauth>':
202 print "WARNING: failed to get tubid for %s (%s)" % (provider, nickname)
203 # don't add to clients to poll (polluting data) don't care about disconnect
205 self.clients[tubid] = provider
206 self.nicknames[tubid] = nickname
209 for tubid,client in self.clients.items():
210 nickname = self.nicknames.get(tubid)
211 d = client.callRemote('get_stats')
212 d.addCallbacks(self.got_stats, self.lost_client,
213 callbackArgs=(tubid, nickname),
214 errbackArgs=(tubid,))
215 d.addErrback(self.log_client_error, tubid)
217 def lost_client(self, f, tubid):
218 # this is called lazily, when a get_stats request fails
219 del self.clients[tubid]
220 del self.nicknames[tubid]
221 f.trap(DeadReferenceError)
223 def log_client_error(self, f, tubid):
224 log.msg("StatsGatherer: error in get_stats(), peerid=%s" % tubid,
225 level=log.UNUSUAL, failure=f)
227 def got_stats(self, stats, tubid, nickname):
228 raise NotImplementedError()
230 class StdOutStatsGatherer(StatsGatherer):
232 def remote_provide(self, provider, nickname):
233 tubid = self.get_tubid(provider)
235 print 'connect "%s" [%s]' % (nickname, tubid)
236 provider.notifyOnDisconnect(self.announce_lost_client, tubid)
237 StatsGatherer.remote_provide(self, provider, nickname)
239 def announce_lost_client(self, tubid):
240 print 'disconnect "%s" [%s]' % (self.nicknames[tubid], tubid)
242 def got_stats(self, stats, tubid, nickname):
243 print '"%s" [%s]:' % (nickname, tubid)
246 class PickleStatsGatherer(StdOutStatsGatherer):
247 # inherit from StdOutStatsGatherer for connect/disconnect notifications
249 def __init__(self, basedir=".", verbose=True):
250 self.verbose = verbose
251 StatsGatherer.__init__(self, basedir)
252 self.picklefile = os.path.join(basedir, "stats.pickle")
254 if os.path.exists(self.picklefile):
255 f = open(self.picklefile, 'rb')
257 self.gathered_stats = pickle.load(f)
259 print ("Error while attempting to load pickle file %s.\n"
260 "You may need to restore this file from a backup, or delete it if no backup is available.\n" %
261 quote_output(os.path.abspath(self.picklefile)))
265 self.gathered_stats = {}
267 def got_stats(self, stats, tubid, nickname):
268 s = self.gathered_stats.setdefault(tubid, {})
269 s['timestamp'] = time.time()
270 s['nickname'] = nickname
274 def dump_pickle(self):
275 tmp = "%s.tmp" % (self.picklefile,)
277 pickle.dump(self.gathered_stats, f)
279 if os.path.exists(self.picklefile):
280 os.unlink(self.picklefile)
281 os.rename(tmp, self.picklefile)
283 class StatsGathererService(service.MultiService):
284 furl_file = "stats_gatherer.furl"
286 def __init__(self, basedir=".", verbose=False):
287 service.MultiService.__init__(self)
288 self.basedir = basedir
289 self.tub = Tub(certFile=os.path.join(self.basedir,
290 "stats_gatherer.pem"))
291 self.tub.setServiceParent(self)
292 self.tub.setOption("logLocalFailures", True)
293 self.tub.setOption("logRemoteFailures", True)
294 self.tub.setOption("expose-remote-exception-types", False)
296 self.stats_gatherer = PickleStatsGatherer(self.basedir, verbose)
297 self.stats_gatherer.setServiceParent(self)
299 portnumfile = os.path.join(self.basedir, "portnum")
301 portnum = open(portnumfile, "r").read()
302 except EnvironmentError:
304 self.listener = self.tub.listenOn(portnum or "tcp:0")
305 d = self.tub.setLocationAutomatically()
307 d.addCallback(self.save_portnum)
308 d.addCallback(self.tub_ready)
309 d.addErrback(log.err)
311 def save_portnum(self, junk):
312 portnum = self.listener.getPortnum()
313 portnumfile = os.path.join(self.basedir, 'portnum')
314 open(portnumfile, 'wb').write('%d\n' % (portnum,))
316 def tub_ready(self, ignored):
317 ff = os.path.join(self.basedir, self.furl_file)
318 self.gatherer_furl = self.tub.registerReference(self.stats_gatherer,