]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - misc/spacetime/diskwatcher.tac
quickstart.html: link to snapshots page, sorted with most recent first.
[tahoe-lafs/tahoe-lafs.git] / misc / spacetime / diskwatcher.tac
1 # -*- python -*-
2
3 """
4 Run this tool with twistd in its own directory, with a file named 'urls.txt'
5 describing which nodes to query. Make sure to copy diskwatcher.py into the
6 same directory. It will request disk-usage numbers from the nodes once per
7 hour (or slower), and store them in a local database. It will compute
8 usage-per-unit time values over several time ranges and make them available
9 through an HTTP query (using ./webport). It will also provide an estimate of
10 how much time is left before the grid's storage is exhausted.
11
12 There are munin plugins (named tahoe_doomsday and tahoe_diskusage) to graph
13 the values this tool computes.
14
15 Each line of urls.txt points to a single node. Each node should have its own
16 dedicated disk: if multiple nodes share a disk, only list one of them in
17 urls.txt (otherwise that space will be double-counted, confusing the
18 results). Each line should be in the form:
19
20  http://host:webport/statistics?t=json
21
22 """
23
24 # TODO:
25 #  built-in graphs on web interface
26
27
28 import os.path, urllib, time
29 from datetime import timedelta
30 from twisted.application import internet, service, strports
31 from twisted.web import server, resource, http, client
32 from twisted.internet import defer
33 from twisted.python import log
34 import simplejson
35 from axiom.attributes import AND
36 from axiom.store import Store
37 from epsilon import extime
38 from diskwatcher import Sample
39
40 #from axiom.item import Item
41 #from axiom.attributes import text, integer, timestamp
42
43 #class Sample(Item):
44 #    url = text()
45 #    when = timestamp()
46 #    used = integer()
47 #    avail = integer()
48
49 #s = Store("history.axiom")
50 #ns = Store("new-history.axiom")
51 #for sa in s.query(Sample):
52 #    diskwatcher.Sample(store=ns,
53 #                       url=sa.url, when=sa.when, used=sa.used, avail=sa.avail)
54 #print "done"
55
56 HOUR = 3600
57 DAY = 24*3600
58 WEEK = 7*DAY
59 MONTH = 30*DAY
60 YEAR = 365*DAY
61
62 class DiskWatcher(service.MultiService, resource.Resource):
63     POLL_INTERVAL = 1*HOUR
64     AVERAGES = {#"60s": 60,
65                 #"5m": 5*60,
66                 #"30m": 30*60,
67                 "1hr": 1*HOUR,
68                 "1day": 1*DAY,
69                 "2wk": 2*WEEK,
70                 "4wk": 4*WEEK,
71                 }
72
73     def __init__(self):
74         assert os.path.exists("diskwatcher.tac") # run from the right directory
75         self.growth_cache = {}
76         service.MultiService.__init__(self)
77         resource.Resource.__init__(self)
78         self.store = Store("history.axiom")
79         self.store.whenFullyUpgraded().addCallback(self._upgrade_complete)
80         service.IService(self.store).setServiceParent(self) # let upgrader run
81         ts = internet.TimerService(self.POLL_INTERVAL, self.poll)
82         ts.setServiceParent(self)
83
84     def _upgrade_complete(self, ignored):
85         print "Axiom store upgrade complete"
86
87     def startService(self):
88         service.MultiService.startService(self)
89
90         try:
91             desired_webport = open("webport", "r").read().strip()
92         except EnvironmentError:
93             desired_webport = None
94         webport = desired_webport or "tcp:0"
95         root = self
96         serv = strports.service(webport, server.Site(root))
97         serv.setServiceParent(self)
98         if not desired_webport:
99             got_port = serv._port.getHost().port
100             open("webport", "w").write("tcp:%d\n" % got_port)
101
102
103     def get_urls(self):
104         for url in open("urls.txt","r").readlines():
105             if "#" in url:
106                 url = url[:url.find("#")]
107             url = url.strip()
108             if not url:
109                 continue
110             yield url
111
112     def poll(self):
113         log.msg("polling..")
114         #return self.poll_synchronous()
115         return self.poll_asynchronous()
116
117     def poll_asynchronous(self):
118         # this didn't actually seem to work any better than poll_synchronous:
119         # logs are more noisy, and I got frequent DNS failures. But with a
120         # lot of servers to query, this is probably the better way to go. A
121         # significant advantage of this approach is that we can use a
122         # timeout= argument to tolerate hanging servers.
123         dl = []
124         for url in self.get_urls():
125             when = extime.Time()
126             d = client.getPage(url, timeout=60)
127             d.addCallback(self.got_response, when, url)
128             dl.append(d)
129         d = defer.DeferredList(dl)
130         def _done(res):
131             fetched = len([1 for (success, value) in res if success])
132             log.msg("fetched %d of %d" % (fetched, len(dl)))
133         d.addCallback(_done)
134         return d
135
136     def poll_synchronous(self):
137         attempts = 0
138         fetched = 0
139         for url in self.get_urls():
140             attempts += 1
141             try:
142                 when = extime.Time()
143                 # if a server accepts the connection and then hangs, this
144                 # will block forever
145                 data_json = urllib.urlopen(url).read()
146                 self.got_response(data_json, when, url)
147                 fetched += 1
148             except:
149                 log.msg("error while fetching: %s" % url)
150                 log.err()
151         log.msg("fetched %d of %d" % (fetched, attempts))
152
153     def got_response(self, data_json, when, url):
154         data = simplejson.loads(data_json)
155         total = data[u"stats"][u"storage_server.disk_total"]
156         used = data[u"stats"][u"storage_server.disk_used"]
157         avail = data[u"stats"][u"storage_server.disk_avail"]
158         print "%s : total=%s, used=%s, avail=%s" % (url,
159                                                     total, used, avail)
160         Sample(store=self.store,
161                url=unicode(url), when=when, total=total, used=used, avail=avail)
162
163     def calculate_growth_timeleft(self):
164         timespans = []
165         total_avail_space = self.find_total_available_space()
166         pairs = [ (timespan,name)
167                   for name,timespan in self.AVERAGES.items() ]
168         pairs.sort()
169         for (timespan,name) in pairs:
170             growth = self.growth(timespan)
171             print name, total_avail_space, growth
172             if growth is not None:
173                 timeleft = None
174                 if growth > 0:
175                     timeleft = total_avail_space / growth
176                 timespans.append( (name, timespan, growth, timeleft) )
177         return timespans
178
179     def find_total_space(self):
180         # this returns the sum of disk-avail stats for all servers that 1)
181         # are listed in urls.txt and 2) have responded recently.
182         now = extime.Time()
183         recent = now - timedelta(seconds=2*self.POLL_INTERVAL)
184         total_space = 0
185         for url in self.get_urls():
186             url = unicode(url)
187             latest = list(self.store.query(Sample,
188                                            AND(Sample.url == url,
189                                                Sample.when > recent),
190                                            sort=Sample.when.descending,
191                                            limit=1))
192             if latest:
193                 total_space += latest[0].total
194         return total_space
195
196     def find_total_available_space(self):
197         # this returns the sum of disk-avail stats for all servers that 1)
198         # are listed in urls.txt and 2) have responded recently.
199         now = extime.Time()
200         recent = now - timedelta(seconds=2*self.POLL_INTERVAL)
201         total_avail_space = 0
202         for url in self.get_urls():
203             url = unicode(url)
204             latest = list(self.store.query(Sample,
205                                            AND(Sample.url == url,
206                                                Sample.when > recent),
207                                            sort=Sample.when.descending,
208                                            limit=1))
209             if latest:
210                 total_avail_space += latest[0].avail
211         return total_avail_space
212
213     def find_total_used_space(self):
214         # this returns the sum of disk-used stats for all servers that 1) are
215         # listed in urls.txt and 2) have responded recently.
216         now = extime.Time()
217         recent = now - timedelta(seconds=2*self.POLL_INTERVAL)
218         total_used_space = 0
219         for url in self.get_urls():
220             url = unicode(url)
221             latest = list(self.store.query(Sample,
222                                            AND(Sample.url == url,
223                                                Sample.when > recent),
224                                            sort=Sample.when.descending,
225                                            limit=1))
226             if latest:
227                 total_used_space += latest[0].used
228         return total_used_space
229
230
231     def growth(self, timespan):
232         """Calculate the bytes-per-second growth of the total disk-used stat,
233         over a period of TIMESPAN seconds (i.e. between the most recent
234         sample and the latest one that's at least TIMESPAN seconds ago),
235         summed over all nodes which 1) are listed in urls.txt, 2) have
236         responded recently, and 3) have a response at least as old as
237         TIMESPAN. If there are no nodes which meet these criteria, we'll
238         return None; this is likely to happen for the longer timespans (4wk)
239         until the gatherer has been running and collecting data for that
240         long."""
241
242         # a note about workload: for our oldest storage servers, as of
243         # 25-Jan-2009, the first DB query here takes about 40ms per server
244         # URL (some take as little as 10ms). There are about 110 servers, and
245         # two queries each, so the growth() function takes about 7s to run
246         # for each timespan. We track 4 timespans, and find_total_*_space()
247         # takes about 2.3s to run, so calculate_growth_timeleft() takes about
248         # 27s. Each HTTP query thus takes 27s, and we have six munin plugins
249         # which perform HTTP queries every 5 minutes. By adding growth_cache(),
250         # I hope to reduce this: the first HTTP query will still take 27s,
251         # but the subsequent five should be about 2.3s each.
252
253         # we're allowed to cache this value for 3 minutes
254         if timespan in self.growth_cache:
255             (when, value) = self.growth_cache[timespan]
256             if time.time() - when < 3*60:
257                 return value
258
259         td = timedelta(seconds=timespan)
260         now = extime.Time()
261         then = now - td
262         recent = now - timedelta(seconds=2*self.POLL_INTERVAL)
263
264         total_growth = 0.0
265         num_nodes = 0
266
267         for url in self.get_urls():
268             url = unicode(url)
269             latest = list(self.store.query(Sample,
270                                            AND(Sample.url == url,
271                                                Sample.when > recent),
272                                            sort=Sample.when.descending,
273                                            limit=1))
274             if not latest:
275                 #print "no latest sample from", url
276                 continue # skip this node
277             latest = latest[0]
278             old = list(self.store.query(Sample,
279                                         AND(Sample.url == url,
280                                             Sample.when < then),
281                                         sort=Sample.when.descending,
282                                         limit=1))
283             if not old:
284                 #print "no old sample from", url
285                 continue # skip this node
286             old = old[0]
287             duration = latest.when.asPOSIXTimestamp() - old.when.asPOSIXTimestamp()
288             if not duration:
289                 print "only one sample from", url
290                 continue
291
292             rate = float(latest.used - old.used) / duration
293             #print url, rate
294             total_growth += rate
295             num_nodes += 1
296
297         if not num_nodes:
298             return None
299         self.growth_cache[timespan] = (time.time(), total_growth)
300         return total_growth
301
302     def getChild(self, path, req):
303         if path == "":
304             return self
305         return resource.Resource.getChild(self, path, req)
306
307     def abbreviate_time(self, s):
308         def _plural(count, unit):
309             count = int(count)
310             if count == 1:
311                 return "%d %s" % (count, unit)
312             return "%d %ss" % (count, unit)
313         if s is None:
314             return "unknown"
315         if s < 120:
316             return _plural(s, "second")
317         if s < 3*HOUR:
318             return _plural(s/60, "minute")
319         if s < 2*DAY:
320             return _plural(s/HOUR, "hour")
321         if s < 2*MONTH:
322             return _plural(s/DAY, "day")
323         if s < 4*YEAR:
324             return _plural(s/MONTH, "month")
325         return _plural(s/YEAR, "year")
326
327     def abbreviate_space2(self, s, SI=True):
328         if s is None:
329             return "unknown"
330         if SI:
331             U = 1000.0
332             isuffix = "B"
333         else:
334             U = 1024.0
335             isuffix = "iB"
336         def r(count, suffix):
337             return "%.2f %s%s" % (count, suffix, isuffix)
338
339         if s < 1024: # 1000-1023 get emitted as bytes, even in SI mode
340             return r(s, "")
341         if s < U*U:
342             return r(s/U, "k")
343         if s < U*U*U:
344             return r(s/(U*U), "M")
345         if s < U*U*U*U:
346             return r(s/(U*U*U), "G")
347         if s < U*U*U*U*U:
348             return r(s/(U*U*U*U), "T")
349         return r(s/(U*U*U*U*U), "P")
350
351     def abbreviate_space(self, s):
352         return "(%s, %s)" % (self.abbreviate_space2(s, True),
353                              self.abbreviate_space2(s, False))
354
355     def render(self, req):
356         t = req.args.get("t", ["html"])[0]
357         ctype = "text/plain"
358         data = ""
359         if t == "html":
360             data = ""
361             for (name, timespan, growth, timeleft) in self.calculate_growth_timeleft():
362                 data += "%f bytes per second (%sps), %s remaining (over %s)\n" % \
363                         (growth, self.abbreviate_space2(growth, True),
364                          self.abbreviate_time(timeleft), name)
365             used = self.find_total_used_space()
366             data += "total used: %d bytes %s\n" % (used,
367                                                    self.abbreviate_space(used))
368             total = self.find_total_space()
369             data += "total space: %d bytes %s\n" % (total,
370                                                     self.abbreviate_space(total))
371         elif t == "json":
372             current = {"rates": self.calculate_growth_timeleft(),
373                        "total": self.find_total_space(),
374                        "used": self.find_total_used_space(),
375                        "available": self.find_total_available_space(),
376                        }
377             data = simplejson.dumps(current, indent=True)
378         else:
379             req.setResponseCode(http.BAD_REQUEST)
380             data = "Unknown t= %s\n" % t
381         req.setHeader("content-type", ctype)
382         return data
383
384 application = service.Application("disk-watcher")
385 DiskWatcher().setServiceParent(application)