6 from collections import deque
8 from twisted.internet import reactor
9 from twisted.application import service
10 from twisted.application.internet import TimerService
11 from zope.interface import implements
12 from foolscap.api import eventually, DeadReferenceError, Referenceable, Tub
14 from allmydata.util import log
15 from allmydata.interfaces import RIStatsProvider, RIStatsGatherer, IStatsProducer
17 class LoadMonitor(service.MultiService):
18 implements(IStatsProducer)
23 def __init__(self, provider, warn_if_delay_exceeds=1):
24 service.MultiService.__init__(self)
25 self.provider = provider
26 self.warn_if_delay_exceeds = warn_if_delay_exceeds
32 def startService(self):
35 self.timer = reactor.callLater(self.loop_interval, self.loop)
36 service.MultiService.startService(self)
38 def stopService(self):
43 return service.MultiService.stopService(self)
50 if self.last is not None:
51 delay = now - self.last - self.loop_interval
52 if delay > self.warn_if_delay_exceeds:
53 log.msg(format='excessive reactor delay (%ss)', args=(delay,),
55 self.stats.append(delay)
56 while len(self.stats) > self.num_samples:
60 self.timer = reactor.callLater(self.loop_interval, self.loop)
64 avg = sum(self.stats) / len(self.stats)
68 return { 'load_monitor.avg_load': avg,
69 'load_monitor.max_load': m_x, }
71 class CPUUsageMonitor(service.MultiService):
72 implements(IStatsProducer)
77 service.MultiService.__init__(self)
78 # we don't use time.clock() here, because the constructor is run by
79 # the twistd parent process (as it loads the .tac file), whereas the
80 # rest of the program will be run by the child process, after twistd
81 # forks. Instead, set self.initial_cpu as soon as the reactor starts
83 self.initial_cpu = 0.0 # just in case
84 eventually(self._set_initial_cpu)
86 # we provide 1min, 5min, and 15min moving averages
87 TimerService(self.POLL_INTERVAL, self.check).setServiceParent(self)
89 def _set_initial_cpu(self):
90 self.initial_cpu = time.clock()
93 now_wall = time.time()
94 now_cpu = time.clock()
95 self.samples.append( (now_wall, now_cpu) )
96 while len(self.samples) > self.HISTORY_LENGTH+1:
99 def _average_N_minutes(self, size):
100 if len(self.samples) < size+1:
103 elapsed_wall = self.samples[-1][0] - self.samples[first][0]
104 elapsed_cpu = self.samples[-1][1] - self.samples[first][1]
105 fraction = elapsed_cpu / elapsed_wall
110 avg = self._average_N_minutes(1)
112 s["cpu_monitor.1min_avg"] = avg
113 avg = self._average_N_minutes(5)
115 s["cpu_monitor.5min_avg"] = avg
116 avg = self._average_N_minutes(15)
118 s["cpu_monitor.15min_avg"] = avg
119 now_cpu = time.clock()
120 s["cpu_monitor.total"] = now_cpu - self.initial_cpu
124 class StatsProvider(Referenceable, service.MultiService):
125 implements(RIStatsProvider)
127 def __init__(self, node, gatherer_furl):
128 service.MultiService.__init__(self)
130 self.gatherer_furl = gatherer_furl # might be None
133 self.stats_producers = []
135 # only run the LoadMonitor (which submits a timer every second) if
136 # there is a gatherer who is going to be paying attention. Our stats
137 # are visible through HTTP even without a gatherer, so run the rest
138 # of the stats (including the once-per-minute CPUUsageMonitor)
140 self.load_monitor = LoadMonitor(self)
141 self.load_monitor.setServiceParent(self)
142 self.register_producer(self.load_monitor)
144 self.cpu_monitor = CPUUsageMonitor()
145 self.cpu_monitor.setServiceParent(self)
146 self.register_producer(self.cpu_monitor)
148 def startService(self):
149 if self.node and self.gatherer_furl:
150 d = self.node.when_tub_ready()
152 nickname_utf8 = self.node.nickname.encode("utf-8")
153 self.node.tub.connectTo(self.gatherer_furl,
154 self._connected, nickname_utf8)
155 d.addCallback(connect)
156 service.MultiService.startService(self)
158 def count(self, name, delta=1):
159 val = self.counters.setdefault(name, 0)
160 self.counters[name] = val + delta
162 def register_producer(self, stats_producer):
163 self.stats_producers.append(IStatsProducer(stats_producer))
167 for sp in self.stats_producers:
168 stats.update(sp.get_stats())
169 ret = { 'counters': self.counters, 'stats': stats }
170 log.msg(format='get_stats() -> %(stats)s', stats=ret, level=log.NOISY)
173 def remote_get_stats(self):
174 return self.get_stats()
176 def _connected(self, gatherer, nickname):
177 gatherer.callRemoteOnly('provide', self, nickname or '')
180 class StatsGatherer(Referenceable, service.MultiService):
181 implements(RIStatsGatherer)
185 def __init__(self, basedir):
186 service.MultiService.__init__(self)
187 self.basedir = basedir
192 self.timer = TimerService(self.poll_interval, self.poll)
193 self.timer.setServiceParent(self)
195 def get_tubid(self, rref):
196 return rref.getRemoteTubID()
198 def remote_provide(self, provider, nickname):
199 tubid = self.get_tubid(provider)
200 if tubid == '<unauth>':
201 print "WARNING: failed to get tubid for %s (%s)" % (provider, nickname)
202 # don't add to clients to poll (polluting data) don't care about disconnect
204 self.clients[tubid] = provider
205 self.nicknames[tubid] = nickname
208 for tubid,client in self.clients.items():
209 nickname = self.nicknames.get(tubid)
210 d = client.callRemote('get_stats')
211 d.addCallbacks(self.got_stats, self.lost_client,
212 callbackArgs=(tubid, nickname),
213 errbackArgs=(tubid,))
214 d.addErrback(self.log_client_error, tubid)
216 def lost_client(self, f, tubid):
217 # this is called lazily, when a get_stats request fails
218 del self.clients[tubid]
219 del self.nicknames[tubid]
220 f.trap(DeadReferenceError)
222 def log_client_error(self, f, tubid):
223 log.msg("StatsGatherer: error in get_stats(), peerid=%s" % tubid,
224 level=log.UNUSUAL, failure=f)
226 def got_stats(self, stats, tubid, nickname):
227 raise NotImplementedError()
229 class StdOutStatsGatherer(StatsGatherer):
231 def remote_provide(self, provider, nickname):
232 tubid = self.get_tubid(provider)
234 print 'connect "%s" [%s]' % (nickname, tubid)
235 provider.notifyOnDisconnect(self.announce_lost_client, tubid)
236 StatsGatherer.remote_provide(self, provider, nickname)
238 def announce_lost_client(self, tubid):
239 print 'disconnect "%s" [%s]' % (self.nicknames[tubid], tubid)
241 def got_stats(self, stats, tubid, nickname):
242 print '"%s" [%s]:' % (nickname, tubid)
245 class PickleStatsGatherer(StdOutStatsGatherer):
246 # inherit from StdOutStatsGatherer for connect/disconnect notifications
248 def __init__(self, basedir=".", verbose=True):
249 self.verbose = verbose
250 StatsGatherer.__init__(self, basedir)
251 self.picklefile = os.path.join(basedir, "stats.pickle")
253 if os.path.exists(self.picklefile):
254 f = open(self.picklefile, 'rb')
255 self.gathered_stats = pickle.load(f)
258 self.gathered_stats = {}
260 def got_stats(self, stats, tubid, nickname):
261 s = self.gathered_stats.setdefault(tubid, {})
262 s['timestamp'] = time.time()
263 s['nickname'] = nickname
267 def dump_pickle(self):
268 tmp = "%s.tmp" % (self.picklefile,)
270 pickle.dump(self.gathered_stats, f)
272 if os.path.exists(self.picklefile):
273 os.unlink(self.picklefile)
274 os.rename(tmp, self.picklefile)
276 class StatsGathererService(service.MultiService):
277 furl_file = "stats_gatherer.furl"
279 def __init__(self, basedir=".", verbose=False):
280 service.MultiService.__init__(self)
281 self.basedir = basedir
282 self.tub = Tub(certFile=os.path.join(self.basedir,
283 "stats_gatherer.pem"))
284 self.tub.setServiceParent(self)
285 self.tub.setOption("logLocalFailures", True)
286 self.tub.setOption("logRemoteFailures", True)
287 self.tub.setOption("expose-remote-exception-types", False)
289 self.stats_gatherer = PickleStatsGatherer(self.basedir, verbose)
290 self.stats_gatherer.setServiceParent(self)
292 portnumfile = os.path.join(self.basedir, "portnum")
294 portnum = open(portnumfile, "r").read()
295 except EnvironmentError:
297 self.listener = self.tub.listenOn(portnum or "tcp:0")
298 d = self.tub.setLocationAutomatically()
300 d.addCallback(self.save_portnum)
301 d.addCallback(self.tub_ready)
302 d.addErrback(log.err)
304 def save_portnum(self, junk):
305 portnum = self.listener.getPortnum()
306 portnumfile = os.path.join(self.basedir, 'portnum')
307 open(portnumfile, 'wb').write('%d\n' % (portnum,))
309 def tub_ready(self, ignored):
310 ff = os.path.join(self.basedir, self.furl_file)
311 self.gatherer_furl = self.tub.registerReference(self.stats_gatherer,