6 from collections import deque
8 from twisted.internet import reactor
9 from twisted.application import service
10 from twisted.application.internet import TimerService
11 from zope.interface import implements
12 from foolscap.api import eventually, DeadReferenceError, Referenceable, Tub
14 from allmydata.util import log
15 from allmydata.util.encodingutil import quote_output
16 from allmydata.interfaces import RIStatsProvider, RIStatsGatherer, IStatsProducer
18 class LoadMonitor(service.MultiService):
19 implements(IStatsProducer)
24 def __init__(self, provider, warn_if_delay_exceeds=1):
25 service.MultiService.__init__(self)
26 self.provider = provider
27 self.warn_if_delay_exceeds = warn_if_delay_exceeds
33 def startService(self):
36 self.timer = reactor.callLater(self.loop_interval, self.loop)
37 service.MultiService.startService(self)
39 def stopService(self):
44 return service.MultiService.stopService(self)
51 if self.last is not None:
52 delay = now - self.last - self.loop_interval
53 if delay > self.warn_if_delay_exceeds:
54 log.msg(format='excessive reactor delay (%ss)', args=(delay,),
56 self.stats.append(delay)
57 while len(self.stats) > self.num_samples:
61 self.timer = reactor.callLater(self.loop_interval, self.loop)
65 avg = sum(self.stats) / len(self.stats)
69 return { 'load_monitor.avg_load': avg,
70 'load_monitor.max_load': m_x, }
72 class CPUUsageMonitor(service.MultiService):
73 implements(IStatsProducer)
78 service.MultiService.__init__(self)
79 # we don't use time.clock() here, because the constructor is run by
80 # the twistd parent process (as it loads the .tac file), whereas the
81 # rest of the program will be run by the child process, after twistd
82 # forks. Instead, set self.initial_cpu as soon as the reactor starts
84 self.initial_cpu = 0.0 # just in case
85 eventually(self._set_initial_cpu)
87 # we provide 1min, 5min, and 15min moving averages
88 TimerService(self.POLL_INTERVAL, self.check).setServiceParent(self)
90 def _set_initial_cpu(self):
91 self.initial_cpu = time.clock()
94 now_wall = time.time()
95 now_cpu = time.clock()
96 self.samples.append( (now_wall, now_cpu) )
97 while len(self.samples) > self.HISTORY_LENGTH+1:
100 def _average_N_minutes(self, size):
101 if len(self.samples) < size+1:
104 elapsed_wall = self.samples[-1][0] - self.samples[first][0]
105 elapsed_cpu = self.samples[-1][1] - self.samples[first][1]
106 fraction = elapsed_cpu / elapsed_wall
111 avg = self._average_N_minutes(1)
113 s["cpu_monitor.1min_avg"] = avg
114 avg = self._average_N_minutes(5)
116 s["cpu_monitor.5min_avg"] = avg
117 avg = self._average_N_minutes(15)
119 s["cpu_monitor.15min_avg"] = avg
120 now_cpu = time.clock()
121 s["cpu_monitor.total"] = now_cpu - self.initial_cpu
125 class StatsProvider(Referenceable, service.MultiService):
126 implements(RIStatsProvider)
128 def __init__(self, node, gatherer_furl):
129 service.MultiService.__init__(self)
131 self.gatherer_furl = gatherer_furl # might be None
134 self.stats_producers = []
136 # only run the LoadMonitor (which submits a timer every second) if
137 # there is a gatherer who is going to be paying attention. Our stats
138 # are visible through HTTP even without a gatherer, so run the rest
139 # of the stats (including the once-per-minute CPUUsageMonitor)
141 self.load_monitor = LoadMonitor(self)
142 self.load_monitor.setServiceParent(self)
143 self.register_producer(self.load_monitor)
145 self.cpu_monitor = CPUUsageMonitor()
146 self.cpu_monitor.setServiceParent(self)
147 self.register_producer(self.cpu_monitor)
149 def startService(self):
150 if self.node and self.gatherer_furl:
151 d = self.node.when_tub_ready()
153 nickname_utf8 = self.node.nickname.encode("utf-8")
154 self.node.tub.connectTo(self.gatherer_furl,
155 self._connected, nickname_utf8)
156 d.addCallback(connect)
157 service.MultiService.startService(self)
159 def count(self, name, delta=1):
160 val = self.counters.setdefault(name, 0)
161 self.counters[name] = val + delta
163 def register_producer(self, stats_producer):
164 self.stats_producers.append(IStatsProducer(stats_producer))
168 for sp in self.stats_producers:
169 stats.update(sp.get_stats())
170 ret = { 'counters': self.counters, 'stats': stats }
171 log.msg(format='get_stats() -> %(stats)s', stats=ret, level=log.NOISY)
174 def remote_get_stats(self):
175 return self.get_stats()
177 def _connected(self, gatherer, nickname):
178 gatherer.callRemoteOnly('provide', self, nickname or '')
181 class StatsGatherer(Referenceable, service.MultiService):
182 implements(RIStatsGatherer)
186 def __init__(self, basedir):
187 service.MultiService.__init__(self)
188 self.basedir = basedir
193 self.timer = TimerService(self.poll_interval, self.poll)
194 self.timer.setServiceParent(self)
196 def get_tubid(self, rref):
197 return rref.getRemoteTubID()
199 def remote_provide(self, provider, nickname):
200 tubid = self.get_tubid(provider)
201 if tubid == '<unauth>':
202 print "WARNING: failed to get tubid for %s (%s)" % (provider, nickname)
203 # don't add to clients to poll (polluting data) don't care about disconnect
205 self.clients[tubid] = provider
206 self.nicknames[tubid] = nickname
209 for tubid,client in self.clients.items():
210 nickname = self.nicknames.get(tubid)
211 d = client.callRemote('get_stats')
212 d.addCallbacks(self.got_stats, self.lost_client,
213 callbackArgs=(tubid, nickname),
214 errbackArgs=(tubid,))
215 d.addErrback(self.log_client_error, tubid)
217 def lost_client(self, f, tubid):
218 # this is called lazily, when a get_stats request fails
219 del self.clients[tubid]
220 del self.nicknames[tubid]
221 f.trap(DeadReferenceError)
223 def log_client_error(self, f, tubid):
224 log.msg("StatsGatherer: error in get_stats(), peerid=%s" % tubid,
225 level=log.UNUSUAL, failure=f)
227 def got_stats(self, stats, tubid, nickname):
228 raise NotImplementedError()
230 class StdOutStatsGatherer(StatsGatherer):
232 def remote_provide(self, provider, nickname):
233 tubid = self.get_tubid(provider)
235 print 'connect "%s" [%s]' % (nickname, tubid)
236 provider.notifyOnDisconnect(self.announce_lost_client, tubid)
237 StatsGatherer.remote_provide(self, provider, nickname)
239 def announce_lost_client(self, tubid):
240 print 'disconnect "%s" [%s]' % (self.nicknames[tubid], tubid)
242 def got_stats(self, stats, tubid, nickname):
243 print '"%s" [%s]:' % (nickname, tubid)
246 class PickleStatsGatherer(StdOutStatsGatherer):
247 # inherit from StdOutStatsGatherer for connect/disconnect notifications
249 def __init__(self, basedir=".", verbose=True):
250 self.verbose = verbose
251 StatsGatherer.__init__(self, basedir)
252 self.picklefile = os.path.join(basedir, "stats.pickle")
254 if os.path.exists(self.picklefile):
255 f = open(self.picklefile, 'rb')
257 self.gathered_stats = pickle.load(f)
259 print ("Error while attempting to load pickle file %s.\nYou may need to delete this file.\n" %
260 quote_output(os.path.abspath(self.picklefile)))
264 self.gathered_stats = {}
266 def got_stats(self, stats, tubid, nickname):
267 s = self.gathered_stats.setdefault(tubid, {})
268 s['timestamp'] = time.time()
269 s['nickname'] = nickname
273 def dump_pickle(self):
274 tmp = "%s.tmp" % (self.picklefile,)
276 pickle.dump(self.gathered_stats, f)
278 if os.path.exists(self.picklefile):
279 os.unlink(self.picklefile)
280 os.rename(tmp, self.picklefile)
282 class StatsGathererService(service.MultiService):
283 furl_file = "stats_gatherer.furl"
285 def __init__(self, basedir=".", verbose=False):
286 service.MultiService.__init__(self)
287 self.basedir = basedir
288 self.tub = Tub(certFile=os.path.join(self.basedir,
289 "stats_gatherer.pem"))
290 self.tub.setServiceParent(self)
291 self.tub.setOption("logLocalFailures", True)
292 self.tub.setOption("logRemoteFailures", True)
293 self.tub.setOption("expose-remote-exception-types", False)
295 self.stats_gatherer = PickleStatsGatherer(self.basedir, verbose)
296 self.stats_gatherer.setServiceParent(self)
298 portnumfile = os.path.join(self.basedir, "portnum")
300 portnum = open(portnumfile, "r").read()
301 except EnvironmentError:
303 self.listener = self.tub.listenOn(portnum or "tcp:0")
304 d = self.tub.setLocationAutomatically()
306 d.addCallback(self.save_portnum)
307 d.addCallback(self.tub_ready)
308 d.addErrback(log.err)
310 def save_portnum(self, junk):
311 portnum = self.listener.getPortnum()
312 portnumfile = os.path.join(self.basedir, 'portnum')
313 open(portnumfile, 'wb').write('%d\n' % (portnum,))
315 def tub_ready(self, ignored):
316 ff = os.path.join(self.basedir, self.furl_file)
317 self.gatherer_furl = self.tub.registerReference(self.stats_gatherer,