From 46d0c9c995d383ae8d99e7bda53cb40fb997e4ec Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@lothar.com>
Date: Wed, 6 Aug 2008 21:22:22 -0700
Subject: [PATCH] disk-watcher: first draft of a daemon to use the HTTP stats
 interface and its new storage_server.disk_avail feature, to track changes in
 disk space over time

---
 misc/spacetime/diskwatcher.py  |  14 ++
 misc/spacetime/diskwatcher.tac | 256 +++++++++++++++++++++++++++++++++
 2 files changed, 270 insertions(+)
 create mode 100644 misc/spacetime/diskwatcher.py
 create mode 100644 misc/spacetime/diskwatcher.tac

diff --git a/misc/spacetime/diskwatcher.py b/misc/spacetime/diskwatcher.py
new file mode 100644
index 00000000..7de54c9f
--- /dev/null
+++ b/misc/spacetime/diskwatcher.py
@@ -0,0 +1,14 @@
+
+# put this definition in a separate file, because axiom uses the
+# fully-qualified classname as a database table name, so __builtin__ is kinda
+# ugly.
+
+from axiom.item import Item
+from axiom.attributes import text, integer, timestamp
+
+class Sample(Item):
+    url = text()
+    when = timestamp(indexed=True)
+    used = integer()
+    avail = integer()
+
diff --git a/misc/spacetime/diskwatcher.tac b/misc/spacetime/diskwatcher.tac
new file mode 100644
index 00000000..b62b4415
--- /dev/null
+++ b/misc/spacetime/diskwatcher.tac
@@ -0,0 +1,256 @@
+# -*- python -*-
+
+"""
+Run this tool with twistd in its own directory, with a file named 'urls.txt'
+describing which nodes to query. It will request disk-usage numbers from the
+nodes once per hour (or slower), and store them in a local database. It will
+compute usage-per-unit time values over several time ranges and make them
+available through an HTTP query (using ./webport). It will also provide an
+estimate of how much time is left before the grid's storage is exhausted.
+
+Each line of urls.txt points to a single node. Each node should have its own
+dedicated disk: if multiple nodes share a disk, only list one of them in
+urls.txt (otherwise that space will be double-counted, confusing the
+results). Each line should be in the form:
+
+ http://host:webport/statistics?t=json
+
+"""
+
+# TODO:
+#  built-in graphs on web interface
+
+
+import os.path, pprint, time, urllib
+from datetime import timedelta
+from twisted.application import internet, service, strports
+from twisted.web import server, resource, http
+from twisted.python import log
+import simplejson
+from axiom.attributes import AND
+from axiom.store import Store
+from epsilon import extime
+from diskwatcher import Sample
+
+#from axiom.item import Item
+#from axiom.attributes import text, integer, timestamp
+
+#class Sample(Item):
+#    url = text()
+#    when = timestamp()
+#    used = integer()
+#    avail = integer()
+
+#s = Store("history.axiom")
+#ns = Store("new-history.axiom")
+#for sa in s.query(Sample):
+#    diskwatcher.Sample(store=ns,
+#                       url=sa.url, when=sa.when, used=sa.used, avail=sa.avail)
+#print "done"
+
+HOUR = 3600
+DAY = 24*3600
+WEEK = 7*DAY
+MONTH = 30*DAY
+YEAR = 365*DAY
+
+class DiskWatcher(service.MultiService, resource.Resource):
+    POLL_INTERVAL = 60#1*HOUR
+    AVERAGES = {"60s": 60,
+                "5m": 5*60,
+                "30m": 30*60,
+                "1hr": 1*HOUR,
+                "1day": 1*DAY,
+                "2wk": 2*WEEK,
+                "4wk": 4*WEEK,
+                }
+
+    def __init__(self):
+        assert os.path.exists("diskwatcher.tac") # run from the right directory
+        service.MultiService.__init__(self)
+        resource.Resource.__init__(self)
+        self.store = Store("history.axiom")
+        ts = internet.TimerService(self.POLL_INTERVAL, self.poll)
+        ts.setServiceParent(self)
+
+    def startService(self):
+        service.MultiService.startService(self)
+
+        try:
+            desired_webport = open("webport", "r").read().strip()
+        except EnvironmentError:
+            desired_webport = None
+        webport = desired_webport or "tcp:0"
+        root = self
+        serv = strports.service(webport, server.Site(root))
+        serv.setServiceParent(self)
+        if not desired_webport:
+            got_port = serv._port.getHost().port
+            open("webport", "w").write("tcp:%d\n" % got_port)
+
+
+    def get_urls(self):
+        for url in open("urls.txt","r").readlines():
+            if "#" in url:
+                url = url[:url.find("#")]
+            url = url.strip()
+            if not url:
+                continue
+            yield url
+
+    def poll(self):
+        log.msg("polling..")
+        attempts = 0
+        fetched = 0
+        for url in self.get_urls():
+            attempts += 1
+            try:
+                when = extime.Time()
+                data = simplejson.load(urllib.urlopen(url))
+                total = data[u"stats"][u"storage_server.disk_total"]
+                used = data[u"stats"][u"storage_server.disk_used"]
+                avail = data[u"stats"][u"storage_server.disk_avail"]
+                #print "%s : total=%s, used=%s, avail=%s" % (url,
+                #                                            total, used, avail)
+                s = Sample(store=self.store,
+                           url=unicode(url), when=when, used=used, avail=avail)
+                fetched += 1
+            except:
+                log.msg("error while fetching: %s" % url)
+                log.err()
+        log.msg("fetched %d of %d" % (fetched, attempts))
+
+    def calculate(self):
+        timespans = []
+        total_avail_space = self.find_total_avail_space()
+        pairs = [ (timespan,name)
+                  for name,timespan in self.AVERAGES.items() ]
+        pairs.sort()
+        for (timespan,name) in pairs:
+            growth = self.growth(timespan)
+            print name, total_avail_space, growth
+            if growth is not None:
+                try:
+                    timeleft = total_avail_space / growth
+                except ZeroDivisionError:
+                    timeleft = None
+                timespans.append( (name, timespan, growth, timeleft) )
+        return timespans
+
+    def find_total_avail_space(self):
+        # this returns the sum of disk-avail stats for all servers that 1)
+        # are listed in urls.txt and 2) have responded recently.
+        now = extime.Time()
+        recent = now - timedelta(seconds=2*self.POLL_INTERVAL)
+        total_avail_space = 0
+        for url in self.get_urls():
+            url = unicode(url)
+            latest = list(self.store.query(Sample,
+                                           AND(Sample.url == url,
+                                               Sample.when > recent),
+                                           sort=Sample.when.descending,
+                                           limit=1))
+            if latest:
+                total_avail_space += latest[0].avail
+        return total_avail_space
+
+
+    def growth(self, timespan):
+        """Calculate the bytes-per-second growth of the total disk-used stat,
+        over a period of TIMESPAN seconds (i.e. between the most recent
+        sample and the latest one that's at least TIMESPAN seconds ago),
+        summed over all nodes which 1) are listed in urls.txt, 2) have
+        responded recently, and 3) have a response at least as old as
+        TIMESPAN. If there are no nodes which meet these criteria, we'll
+        return None; this is likely to happen for the longer timespans (4wk)
+        until the gatherer has been running and collecting data for that
+        long."""
+
+        td = timedelta(seconds=timespan)
+        now = extime.Time()
+        then = now - td
+        recent = now - timedelta(seconds=2*self.POLL_INTERVAL)
+
+        total_growth = 0.0
+        num_nodes = 0
+
+        for url in self.get_urls():
+            url = unicode(url)
+            latest = list(self.store.query(Sample,
+                                           AND(Sample.url == url,
+                                               Sample.when > recent),
+                                           sort=Sample.when.descending,
+                                           limit=1))
+            if not latest:
+                #print "no latest sample from", url
+                continue # skip this node
+            latest = latest[0]
+            old = list(self.store.query(Sample,
+                                        AND(Sample.url == url,
+                                            Sample.when < then),
+                                        sort=Sample.when.descending,
+                                        limit=1))
+            if not old:
+                #print "no old sample from", url
+                continue # skip this node
+            old = old[0]
+            duration = latest.when.asPOSIXTimestamp() - old.when.asPOSIXTimestamp()
+            if not duration:
+                print "only one sample from", url
+                continue
+
+            rate = float(latest.used - old.used) / duration
+            #print url, rate
+            total_growth += rate
+            num_nodes += 1
+
+        if not num_nodes:
+            return None
+        return total_growth
+
+    def getChild(self, path, req):
+        if path == "":
+            return self
+        return resource.Resource.getChild(self, path, req)
+
+    def abbreviate_time(self, s):
+        def _plural(count, unit):
+            count = int(count)
+            if count == 1:
+                return "%d %s" % (count, unit)
+            return "%d %ss" % (count, unit)
+        if s is None:
+            return "unknown"
+        if s < 120:
+            return _plural(s, "second")
+        if s < 3*HOUR:
+            return _plural(s/60, "minute")
+        if s < 2*DAY:
+            return _plural(s/HOUR, "hour")
+        if s < 2*MONTH:
+            return _plural(s/DAY, "day")
+        if s < 4*YEAR:
+            return _plural(s/MONTH, "month")
+        return _plural(s/YEAR, "year")
+
+    def render(self, req):
+        t = req.args.get("t", ["html"])[0]
+        ctype = "text/plain"
+        data = ""
+        if t == "html":
+            data = ""
+            for (name, timespan, growth, timeleft) in self.calculate():
+                data += "%f bytes per second, %s remaining (over %s)\n" % \
+                        (growth, self.abbreviate_time(timeleft), name)
+        elif t == "json":
+            current = self.calculate()
+            #data = str(current) + "\n" # isn't that convenient? almost.
+            data = simplejson.dumps(current, indent=True)
+        else:
+            req.setResponseCode(http.BAD_REQUEST)
+            data = "Unknown t= %s\n" % t
+        req.setHeader("content-type", ctype)
+        return data
+
+application = service.Application("disk-watcher")
+DiskWatcher().setServiceParent(application)
-- 
2.45.2