]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - misc/operations_helpers/cpu-watcher.tac
remove introducer's set_encoding_parameters
[tahoe-lafs/tahoe-lafs.git] / misc / operations_helpers / cpu-watcher.tac
1 # -*- python -*-
2
3 """
4 # run this tool on a linux box in its own directory, with a file named
5 # 'pids.txt' describing which processes to watch. It will follow CPU usage of
6 # the given processes, and compute 1/5/15-minute moving averages for each
7 # process. These averages can be retrieved from a foolscap connection
8 # (published at ./watcher.furl), or through an HTTP query (using ./webport).
9
10 # Each line of pids.txt describes a single process. Blank lines and ones that
11 # begin with '#' are ignored. Each line is either "PID" or "PID NAME" (space
12 # separated). PID is either a numeric process ID, a pathname to a file that
13 # contains a process ID, or a pathname to a directory that contains a
14 # twistd.pid file (which contains a process ID). NAME is an arbitrary string
15 # that will be used to describe the process to watcher.furl subscribers, and
16 # defaults to PID if not provided.
17 """
18
19 # TODO:
20 #  built-in graphs on web interface
21
22
23
24 import pickle, os.path, time, pprint
25 from twisted.application import internet, service, strports
26 from twisted.web import server, resource, http
27 from twisted.python import log
28 import simplejson
29 from foolscap import Tub, Referenceable, RemoteInterface, eventual
30 from foolscap.schema import ListOf, TupleOf
31 from zope.interface import implements
32
33 def read_cpu_times(pid):
34     data = open("/proc/%d/stat" % pid, "r").read()
35     data = data.split()
36     times = data[13:17]
37     # the values in /proc/%d/stat are in ticks, I think. My system has
38     # CONFIG_HZ_1000=y in /proc/config.gz but nevertheless the numbers in
39     # 'stat' appear to be 10ms each.
40     HZ = 100
41     userspace_seconds = int(times[0]) * 1.0 / HZ
42     system_seconds = int(times[1]) * 1.0 / HZ
43     child_userspace_seconds = int(times[2]) * 1.0 / HZ
44     child_system_seconds = int(times[3]) * 1.0 / HZ
45     return (userspace_seconds, system_seconds)
46
47
48 def read_pids_txt():
49     processes = []
50     for line in open("pids.txt", "r").readlines():
51         line = line.strip()
52         if not line or line[0] == "#":
53             continue
54         parts = line.split()
55         pidthing = parts[0]
56         if len(parts) > 1:
57             name = parts[1]
58         else:
59             name = pidthing
60         pid = None
61         try:
62             pid = int(pidthing)
63         except ValueError:
64             pidfile = os.path.expanduser(pidthing)
65             if os.path.isdir(pidfile):
66                 pidfile = os.path.join(pidfile, "twistd.pid")
67             try:
68                 pid = int(open(pidfile, "r").read().strip())
69             except EnvironmentError:
70                 pass
71         if pid is not None:
72             processes.append( (pid, name) )
73     return processes
74
75 Averages = ListOf( TupleOf(str, float, float, float) )
76 class RICPUWatcherSubscriber(RemoteInterface):
77     def averages(averages=Averages):
78         return None
79
80 class RICPUWatcher(RemoteInterface):
81     def get_averages():
82         """Return a list of rows, one for each process I am watching. Each
83         row is (name, 1-min-avg, 5-min-avg, 15-min-avg), where 'name' is a
84         string, and the averages are floats from 0.0 to 1.0 . Each average is
85         the percentage of the CPU that this process has used: the change in
86         CPU time divided by the change in wallclock time.
87         """
88         return Averages
89
90     def subscribe(observer=RICPUWatcherSubscriber):
91         """Arrange for the given observer to get an 'averages' message every
92         time the averages are updated. This message will contain a single
93         argument, the same list of tuples that get_averages() returns."""
94         return None
95
96 class CPUWatcher(service.MultiService, resource.Resource, Referenceable):
97     implements(RICPUWatcher)
98     POLL_INTERVAL = 30 # seconds
99     HISTORY_LIMIT = 15 * 60 # 15min
100     AVERAGES = (1*60, 5*60, 15*60) # 1min, 5min, 15min
101
102     def __init__(self):
103         service.MultiService.__init__(self)
104         resource.Resource.__init__(self)
105         try:
106             self.history = pickle.load(open("history.pickle", "rb"))
107         except:
108             self.history = {}
109         self.current = []
110         self.observers = set()
111         ts = internet.TimerService(self.POLL_INTERVAL, self.poll)
112         ts.setServiceParent(self)
113
114     def startService(self):
115         service.MultiService.startService(self)
116
117         try:
118             desired_webport = open("webport", "r").read().strip()
119         except EnvironmentError:
120             desired_webport = None
121         webport = desired_webport or "tcp:0"
122         root = self
123         serv = strports.service(webport, server.Site(root))
124         serv.setServiceParent(self)
125         if not desired_webport:
126             got_port = serv._port.getHost().port
127             open("webport", "w").write("tcp:%d\n" % got_port)
128
129         self.tub = Tub(certFile="watcher.pem")
130         self.tub.setServiceParent(self)
131         try:
132             desired_tubport = open("tubport", "r").read().strip()
133         except EnvironmentError:
134             desired_tubport = None
135         tubport = desired_tubport or "tcp:0"
136         l = self.tub.listenOn(tubport)
137         if not desired_tubport:
138             got_port = l.getPortnum()
139             open("tubport", "w").write("tcp:%d\n" % got_port)
140         d = self.tub.setLocationAutomatically()
141         d.addCallback(self._tub_ready)
142         d.addErrback(log.err)
143
144     def _tub_ready(self, res):
145         self.tub.registerReference(self, furlFile="watcher.furl")
146
147
148     def getChild(self, path, req):
149         if path == "":
150             return self
151         return resource.Resource.getChild(self, path, req)
152
153     def render(self, req):
154         t = req.args.get("t", ["html"])[0]
155         ctype = "text/plain"
156         data = ""
157         if t == "html":
158             data = "# name, 1min, 5min, 15min\n"
159             data += pprint.pformat(self.current) + "\n"
160         elif t == "json":
161             #data = str(self.current) + "\n" # isn't that convenient? almost.
162             data = simplejson.dumps(self.current, indent=True)
163         else:
164             req.setResponseCode(http.BAD_REQUEST)
165             data = "Unknown t= %s\n" % t
166         req.setHeader("content-type", ctype)
167         return data
168
169     def remote_get_averages(self):
170         return self.current
171     def remote_subscribe(self, observer):
172         self.observers.add(observer)
173
174     def notify(self, observer):
175         d = observer.callRemote("averages", self.current)
176         def _error(f):
177             log.msg("observer error, removing them")
178             log.msg(f)
179             self.observers.discard(observer)
180         d.addErrback(_error)
181
182     def poll(self):
183         max_history = self.HISTORY_LIMIT / self.POLL_INTERVAL
184         current = []
185         try:
186             processes = read_pids_txt()
187         except:
188             log.err()
189             return
190         for (pid, name) in processes:
191             if pid not in self.history:
192                 self.history[pid] = []
193             now = time.time()
194             try:
195                 (user_seconds, sys_seconds) = read_cpu_times(pid)
196                 self.history[pid].append( (now, user_seconds, sys_seconds) )
197                 while len(self.history[pid]) > max_history+1:
198                     self.history[pid].pop(0)
199             except:
200                 log.msg("error reading process %s (%s), ignoring" % (pid, name))
201                 log.err()
202         try:
203             pickle.dump(self.history, open("history.pickle.tmp", "wb"))
204             os.rename("history.pickle.tmp", "history.pickle")
205         except:
206             pass
207         for (pid, name) in processes:
208             row = [name]
209             for avg in self.AVERAGES:
210                 row.append(self._average_N(pid, avg))
211             current.append(tuple(row))
212         self.current = current
213         print current
214         for ob in self.observers:
215             eventual.eventually(self.notify, ob)
216
217     def _average_N(self, pid, seconds):
218         num_samples = seconds / self.POLL_INTERVAL
219         samples = self.history[pid]
220         if len(samples) < num_samples+1:
221             return None
222         first = -num_samples-1
223         elapsed_wall = samples[-1][0] - samples[first][0]
224         elapsed_user = samples[-1][1] - samples[first][1]
225         elapsed_sys = samples[-1][2] - samples[first][2]
226         if elapsed_wall == 0.0:
227             return 0.0
228         return (elapsed_user+elapsed_sys) / elapsed_wall
229
230 application = service.Application("cpu-watcher")
231 CPUWatcher().setServiceParent(application)