From 79147a3f6ac47b06128db5dfcf91d679498c8d48 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Wed, 7 May 2008 12:34:29 -0700 Subject: [PATCH] misc/cpu-watcher*: add some tools to monitor CPU usage of arbitrary processes, like tahoe nodes --- misc/cpu-watcher-poll.py | 24 ++++ misc/cpu-watcher-subscribe.py | 49 ++++++++ misc/cpu-watcher.tac | 221 ++++++++++++++++++++++++++++++++++ 3 files changed, 294 insertions(+) create mode 100644 misc/cpu-watcher-poll.py create mode 100644 misc/cpu-watcher-subscribe.py create mode 100644 misc/cpu-watcher.tac diff --git a/misc/cpu-watcher-poll.py b/misc/cpu-watcher-poll.py new file mode 100644 index 00000000..68cc8595 --- /dev/null +++ b/misc/cpu-watcher-poll.py @@ -0,0 +1,24 @@ +#! /usr/bin/python + +from foolscap import Tub, eventual +from twisted.internet import reactor +import sys +import pprint + +def oops(f): + print "ERROR" + print f + +def fetch(furl): + t = Tub() + t.startService() + d = t.getReference(furl) + d.addCallback(lambda rref: rref.callRemote("get_averages")) + d.addCallback(pprint.pprint) + return d + +d = eventual.fireEventually(sys.argv[1]) +d.addCallback(fetch) +d.addErrback(oops) +d.addBoth(lambda res: reactor.stop()) +reactor.run() diff --git a/misc/cpu-watcher-subscribe.py b/misc/cpu-watcher-subscribe.py new file mode 100644 index 00000000..4c560e2c --- /dev/null +++ b/misc/cpu-watcher-subscribe.py @@ -0,0 +1,49 @@ +# -*- python -*- + +from twisted.internet import reactor +import sys + +import os.path, pprint +from twisted.application import service +from twisted.python import log +from foolscap import Tub, Referenceable, RemoteInterface +from foolscap.schema import ListOf, TupleOf +from zope.interface import implements + +Averages = ListOf( TupleOf(str, float, float, float) ) +class RICPUWatcherSubscriber(RemoteInterface): + def averages(averages=Averages): + return None + +class CPUWatcherSubscriber(service.MultiService, Referenceable): + implements(RICPUWatcherSubscriber) + def __init__(self, furlthing): + service.MultiService.__init__(self) + if furlthing.startswith("pb://"): + furl = furlthing + else: + furlfile = os.path.expanduser(furlthing) + if os.path.isdir(furlfile): + furlfile = os.path.join(furlfile, "watcher.furl") + furl = open(furlfile, "r").read().strip() + tub = Tub() + tub.setServiceParent(self) + tub.connectTo(furl, self.connected) + + def connected(self, rref): + print "subscribing" + d = rref.callRemote("get_averages") + d.addCallback(self.remote_averages) + d.addErrback(log.err) + + d = rref.callRemote("subscribe", self) + d.addErrback(log.err) + + def remote_averages(self, averages): + pprint.pprint(averages) + + +c = CPUWatcherSubscriber(sys.argv[1]) +c.startService() +reactor.run() + diff --git a/misc/cpu-watcher.tac b/misc/cpu-watcher.tac new file mode 100644 index 00000000..4992a1c4 --- /dev/null +++ b/misc/cpu-watcher.tac @@ -0,0 +1,221 @@ +# -*- python -*- + +""" +# run this tool on a linux box in its own directory, with a file named +# 'pids.txt' describing which processes to watch. It will follow CPU usage of +# the given processes, and compute 1/5/15-minute moving averages for each +# process. These averages can be retrieved from a foolscap connection +# (published at ./watcher.furl), or through an HTTP query (using ./webport). + +# Each line of pids.txt describes a single process. Blank lines and ones that +# begin with '#' are ignored. Each line is either "PID" or "PID NAME" (space +# separated). PID is either a numeric process ID, a pathname to a file that +# contains a process ID, or a pathname to a directory that contains a +# twistd.pid file (which contains a process ID). NAME is an arbitrary string +# that will be used to describe the process to watcher.furl subscribers, and +# defaults to PID if not provided. +""" + +# TODO: +# built-in graphs on web interface + + + +import pickle, os.path, time, pprint +from twisted.application import internet, service, strports +from twisted.web import server, resource, http +from twisted.python import log +import simplejson +from foolscap import Tub, Referenceable, RemoteInterface, eventual +from foolscap.schema import ListOf, TupleOf +from zope.interface import implements + +def read_cpu_times(pid): + data = open("/proc/%d/stat" % pid, "r").read() + data = data.split() + times = data[13:17] + # the values in /proc/%d/stat are in ticks, I think. My system has + # CONFIG_HZ_1000=y in /proc/config.gz but nevertheless the numbers in + # 'stat' appear to be 10ms each. + HZ = 100 + userspace_seconds = int(times[0]) * 1.0 / HZ + system_seconds = int(times[1]) * 1.0 / HZ + child_userspace_seconds = int(times[2]) * 1.0 / HZ + child_system_seconds = int(times[3]) * 1.0 / HZ + return (userspace_seconds, system_seconds) + + +def read_pids_txt(): + processes = [] + for line in open("pids.txt", "r").readlines(): + line = line.strip() + if not line or line[0] == "#": + continue + parts = line.split() + pidthing = parts[0] + if len(parts) > 1: + name = parts[1] + else: + name = pidthing + try: + pid = int(pidthing) + except ValueError: + pidfile = os.path.expanduser(pidthing) + if os.path.isdir(pidfile): + pidfile = os.path.join(pidfile, "twistd.pid") + pid = int(open(pidfile, "r").read().strip()) + processes.append( (pid, name) ) + return processes + +Averages = ListOf( TupleOf(str, float, float, float) ) +class RICPUWatcherSubscriber(RemoteInterface): + def averages(averages=Averages): + return None + +class RICPUWatcher(RemoteInterface): + def get_averages(): + """Return a list of rows, one for each process I am watching. Each + row is (name, 1-min-avg, 5-min-avg, 15-min-avg), where 'name' is a + string, and the averages are floats from 0.0 to 1.0 . Each average is + the percentage of the CPU that this process has used: the change in + CPU time divided by the change in wallclock time. + """ + return Averages + + def subscribe(observer=RICPUWatcherSubscriber): + """Arrange for the given observer to get an 'averages' message every + time the averages are updated. This message will contain a single + argument, the same list of tuples that get_averages() returns.""" + return None + +class CPUWatcher(service.MultiService, resource.Resource, Referenceable): + implements(RICPUWatcher) + POLL_INTERVAL = 30 # seconds + HISTORY_LIMIT = 15 * 60 # 15min + AVERAGES = (1*60, 5*60, 15*60) # 1min, 5min, 15min + + def __init__(self): + service.MultiService.__init__(self) + resource.Resource.__init__(self) + try: + self.history = pickle.load(open("history.pickle", "rb")) + except EnvironmentError: + self.history = {} + self.current = [] + self.observers = set() + ts = internet.TimerService(self.POLL_INTERVAL, self.poll) + ts.setServiceParent(self) + + def startService(self): + service.MultiService.startService(self) + + try: + desired_webport = open("webport", "r").read().strip() + except EnvironmentError: + desired_webport = None + webport = desired_webport or "tcp:0" + root = self + serv = strports.service(webport, server.Site(root)) + serv.setServiceParent(self) + if not desired_webport: + got_port = serv._port.getHost().port + open("webport", "w").write("tcp:%d\n" % got_port) + + self.tub = Tub(certFile="watcher.pem") + self.tub.setServiceParent(self) + try: + desired_tubport = open("tubport", "r").read().strip() + except EnvironmentError: + desired_tubport = None + tubport = desired_tubport or "tcp:0" + l = self.tub.listenOn(tubport) + if not desired_tubport: + got_port = l.getPortnum() + open("tubport", "w").write("tcp:%d\n" % got_port) + d = self.tub.setLocationAutomatically() + d.addCallback(self._tub_ready) + d.addErrback(log.err) + + def _tub_ready(self, res): + self.tub.registerReference(self, furlFile="watcher.furl") + + + def getChild(self, path, req): + if path == "": + return self + return resource.Resource.getChild(self, path, req) + + def render(self, req): + t = req.args.get("t", ["html"])[0] + ctype = "text/plain" + data = "" + if t == "html": + data = "# name, 1min, 5min, 15min\n" + data += pprint.pformat(self.current) + "\n" + elif t == "json": + #data = str(self.current) + "\n" # isn't that convenient? almost. + data = simplejson.dumps(self.current, indent=True) + else: + req.setResponseCode(http.BAD_REQUEST) + data = "Unknown t= %s\n" % t + req.setHeader("content-type", ctype) + return data + + def remote_get_averages(self): + return self.current + def remote_subscribe(self, observer): + self.observers.add(observer) + + def notify(self, observer): + d = observer.callRemote("averages", self.current) + def _error(f): + log.msg("observer error, removing them") + log.msg(f) + self.observers.discard(observer) + d.addErrback(_error) + + def poll(self): + max_history = self.HISTORY_LIMIT / self.POLL_INTERVAL + current = [] + try: + processes = read_pids_txt() + except: + log.err() + return + for (pid, name) in processes: + if pid not in self.history: + self.history[pid] = [] + now = time.time() + try: + (user_seconds, sys_seconds) = read_cpu_times(pid) + self.history[pid].append( (now, user_seconds, sys_seconds) ) + while len(self.history[pid]) > max_history+1: + self.history[pid].pop(0) + except: + log.err() + pickle.dump(self.history, open("history.pickle", "wb")) + for (pid, name) in processes: + row = [name] + for avg in self.AVERAGES: + row.append(self._average_N(pid, avg)) + current.append(tuple(row)) + self.current = current + print current + for ob in self.observers: + eventual.eventually(self.notify, ob) + + def _average_N(self, pid, seconds): + num_samples = seconds / self.POLL_INTERVAL + samples = self.history[pid] + if len(samples) < num_samples+1: + return None + first = -num_samples-1 + elapsed_wall = samples[-1][0] - samples[first][0] + elapsed_user = samples[-1][1] - samples[first][1] + elapsed_sys = samples[-1][2] - samples[first][2] + if elapsed_wall == 0.0: + return 0.0 + return (elapsed_user+elapsed_sys) / elapsed_wall + +application = service.Application("cpu-watcher") +CPUWatcher().setServiceParent(application) -- 2.45.2