4 Run this tool with twistd in its own directory, with a file named 'urls.txt'
5 describing which nodes to query. Make sure to copy diskwatcher.py into the
6 same directory. It will request disk-usage numbers from the nodes once per
7 hour (or slower), and store them in a local database. It will compute
8 usage-per-unit time values over several time ranges and make them available
9 through an HTTP query (using ./webport). It will also provide an estimate of
10 how much time is left before the grid's storage is exhausted.
12 There are munin plugins (named tahoe_doomsday and tahoe_diskusage) to graph
13 the values this tool computes.
15 Each line of urls.txt points to a single node. Each node should have its own
16 dedicated disk: if multiple nodes share a disk, only list one of them in
17 urls.txt (otherwise that space will be double-counted, confusing the
18 results). Each line should be in the form:
20 http://host:webport/statistics?t=json
25 # built-in graphs on web interface
28 import os.path, urllib, time
29 from datetime import timedelta
30 from twisted.application import internet, service, strports
31 from twisted.web import server, resource, http, client
32 from twisted.internet import defer
33 from twisted.python import log
35 from axiom.attributes import AND
36 from axiom.store import Store
37 from epsilon import extime
38 from diskwatcher import Sample
40 #from axiom.item import Item
41 #from axiom.attributes import text, integer, timestamp
49 #s = Store("history.axiom")
50 #ns = Store("new-history.axiom")
51 #for sa in s.query(Sample):
52 # diskwatcher.Sample(store=ns,
53 # url=sa.url, when=sa.when, used=sa.used, avail=sa.avail)
62 class DiskWatcher(service.MultiService, resource.Resource):
63 POLL_INTERVAL = 1*HOUR
64 AVERAGES = {#"60s": 60,
74 assert os.path.exists("diskwatcher.tac") # run from the right directory
75 self.growth_cache = {}
76 service.MultiService.__init__(self)
77 resource.Resource.__init__(self)
78 self.store = Store("history.axiom")
79 self.store.whenFullyUpgraded().addCallback(self._upgrade_complete)
80 service.IService(self.store).setServiceParent(self) # let upgrader run
81 ts = internet.TimerService(self.POLL_INTERVAL, self.poll)
82 ts.setServiceParent(self)
84 def _upgrade_complete(self, ignored):
85 print "Axiom store upgrade complete"
87 def startService(self):
88 service.MultiService.startService(self)
91 desired_webport = open("webport", "r").read().strip()
92 except EnvironmentError:
93 desired_webport = None
94 webport = desired_webport or "tcp:0"
96 serv = strports.service(webport, server.Site(root))
97 serv.setServiceParent(self)
98 if not desired_webport:
99 got_port = serv._port.getHost().port
100 open("webport", "w").write("tcp:%d\n" % got_port)
104 for url in open("urls.txt","r").readlines():
106 url = url[:url.find("#")]
114 #return self.poll_synchronous()
115 return self.poll_asynchronous()
117 def poll_asynchronous(self):
118 # this didn't actually seem to work any better than poll_synchronous:
119 # logs are more noisy, and I got frequent DNS failures. But with a
120 # lot of servers to query, this is probably the better way to go. A
121 # significant advantage of this approach is that we can use a
122 # timeout= argument to tolerate hanging servers.
124 for url in self.get_urls():
126 d = client.getPage(url, timeout=60)
127 d.addCallback(self.got_response, when, url)
129 d = defer.DeferredList(dl)
131 fetched = len([1 for (success, value) in res if success])
132 log.msg("fetched %d of %d" % (fetched, len(dl)))
136 def poll_synchronous(self):
139 for url in self.get_urls():
143 # if a server accepts the connection and then hangs, this
145 data_json = urllib.urlopen(url).read()
146 self.got_response(data_json, when, url)
149 log.msg("error while fetching: %s" % url)
151 log.msg("fetched %d of %d" % (fetched, attempts))
153 def got_response(self, data_json, when, url):
154 data = simplejson.loads(data_json)
155 total = data[u"stats"][u"storage_server.disk_total"]
156 used = data[u"stats"][u"storage_server.disk_used"]
157 avail = data[u"stats"][u"storage_server.disk_avail"]
158 print "%s : total=%s, used=%s, avail=%s" % (url,
160 Sample(store=self.store,
161 url=unicode(url), when=when, total=total, used=used, avail=avail)
163 def calculate_growth_timeleft(self):
165 total_avail_space = self.find_total_available_space()
166 pairs = [ (timespan,name)
167 for name,timespan in self.AVERAGES.items() ]
169 for (timespan,name) in pairs:
170 growth = self.growth(timespan)
171 print name, total_avail_space, growth
172 if growth is not None:
175 timeleft = total_avail_space / growth
176 timespans.append( (name, timespan, growth, timeleft) )
179 def find_total_space(self):
180 # this returns the sum of disk-avail stats for all servers that 1)
181 # are listed in urls.txt and 2) have responded recently.
183 recent = now - timedelta(seconds=2*self.POLL_INTERVAL)
185 for url in self.get_urls():
187 latest = list(self.store.query(Sample,
188 AND(Sample.url == url,
189 Sample.when > recent),
190 sort=Sample.when.descending,
193 total_space += latest[0].total
196 def find_total_available_space(self):
197 # this returns the sum of disk-avail stats for all servers that 1)
198 # are listed in urls.txt and 2) have responded recently.
200 recent = now - timedelta(seconds=2*self.POLL_INTERVAL)
201 total_avail_space = 0
202 for url in self.get_urls():
204 latest = list(self.store.query(Sample,
205 AND(Sample.url == url,
206 Sample.when > recent),
207 sort=Sample.when.descending,
210 total_avail_space += latest[0].avail
211 return total_avail_space
213 def find_total_used_space(self):
214 # this returns the sum of disk-used stats for all servers that 1) are
215 # listed in urls.txt and 2) have responded recently.
217 recent = now - timedelta(seconds=2*self.POLL_INTERVAL)
219 for url in self.get_urls():
221 latest = list(self.store.query(Sample,
222 AND(Sample.url == url,
223 Sample.when > recent),
224 sort=Sample.when.descending,
227 total_used_space += latest[0].used
228 return total_used_space
231 def growth(self, timespan):
232 """Calculate the bytes-per-second growth of the total disk-used stat,
233 over a period of TIMESPAN seconds (i.e. between the most recent
234 sample and the latest one that's at least TIMESPAN seconds ago),
235 summed over all nodes which 1) are listed in urls.txt, 2) have
236 responded recently, and 3) have a response at least as old as
237 TIMESPAN. If there are no nodes which meet these criteria, we'll
238 return None; this is likely to happen for the longer timespans (4wk)
239 until the gatherer has been running and collecting data for that
242 # a note about workload: for our oldest storage servers, as of
243 # 25-Jan-2009, the first DB query here takes about 40ms per server
244 # URL (some take as little as 10ms). There are about 110 servers, and
245 # two queries each, so the growth() function takes about 7s to run
246 # for each timespan. We track 4 timespans, and find_total_*_space()
247 # takes about 2.3s to run, so calculate_growth_timeleft() takes about
248 # 27s. Each HTTP query thus takes 27s, and we have six munin plugins
249 # which perform HTTP queries every 5 minutes. By adding growth_cache(),
250 # I hope to reduce this: the first HTTP query will still take 27s,
251 # but the subsequent five should be about 2.3s each.
253 # we're allowed to cache this value for 3 minutes
254 if timespan in self.growth_cache:
255 (when, value) = self.growth_cache[timespan]
256 if time.time() - when < 3*60:
259 td = timedelta(seconds=timespan)
262 recent = now - timedelta(seconds=2*self.POLL_INTERVAL)
267 for url in self.get_urls():
269 latest = list(self.store.query(Sample,
270 AND(Sample.url == url,
271 Sample.when > recent),
272 sort=Sample.when.descending,
275 #print "no latest sample from", url
276 continue # skip this node
278 old = list(self.store.query(Sample,
279 AND(Sample.url == url,
281 sort=Sample.when.descending,
284 #print "no old sample from", url
285 continue # skip this node
287 duration = latest.when.asPOSIXTimestamp() - old.when.asPOSIXTimestamp()
289 print "only one sample from", url
292 rate = float(latest.used - old.used) / duration
299 self.growth_cache[timespan] = (time.time(), total_growth)
302 def getChild(self, path, req):
305 return resource.Resource.getChild(self, path, req)
307 def abbreviate_time(self, s):
308 def _plural(count, unit):
311 return "%d %s" % (count, unit)
312 return "%d %ss" % (count, unit)
316 return _plural(s, "second")
318 return _plural(s/60, "minute")
320 return _plural(s/HOUR, "hour")
322 return _plural(s/DAY, "day")
324 return _plural(s/MONTH, "month")
325 return _plural(s/YEAR, "year")
327 def abbreviate_space2(self, s, SI=True):
336 def r(count, suffix):
337 return "%.2f %s%s" % (count, suffix, isuffix)
339 if s < 1024: # 1000-1023 get emitted as bytes, even in SI mode
344 return r(s/(U*U), "M")
346 return r(s/(U*U*U), "G")
348 return r(s/(U*U*U*U), "T")
349 return r(s/(U*U*U*U*U), "P")
351 def abbreviate_space(self, s):
352 return "(%s, %s)" % (self.abbreviate_space2(s, True),
353 self.abbreviate_space2(s, False))
355 def render(self, req):
356 t = req.args.get("t", ["html"])[0]
361 for (name, timespan, growth, timeleft) in self.calculate_growth_timeleft():
362 data += "%f bytes per second (%sps), %s remaining (over %s)\n" % \
363 (growth, self.abbreviate_space2(growth, True),
364 self.abbreviate_time(timeleft), name)
365 used = self.find_total_used_space()
366 data += "total used: %d bytes %s\n" % (used,
367 self.abbreviate_space(used))
368 total = self.find_total_space()
369 data += "total space: %d bytes %s\n" % (total,
370 self.abbreviate_space(total))
372 current = {"rates": self.calculate_growth_timeleft(),
373 "total": self.find_total_space(),
374 "used": self.find_total_used_space(),
375 "available": self.find_total_available_space(),
377 data = simplejson.dumps(current, indent=True)
379 req.setResponseCode(http.BAD_REQUEST)
380 data = "Unknown t= %s\n" % t
381 req.setHeader("content-type", ctype)
384 application = service.Application("disk-watcher")
385 DiskWatcher().setServiceParent(application)