stats: add a simple stats gathering system
authorrobk-tahoe <robk-tahoe@allmydata.com>
Thu, 31 Jan 2008 03:11:07 +0000 (20:11 -0700)
committerrobk-tahoe <robk-tahoe@allmydata.com>
Thu, 31 Jan 2008 03:11:07 +0000 (20:11 -0700)
We have a desire to collect runtime statistics from multiple nodes primarily
for server monitoring purposes.   This implements a simple implementation of
such a system, as a skeleton to build more sophistication upon.

Each client now looks for a 'stats_gatherer.furl' config file.  If it has
been configured to use a stats gatherer, then it instantiates internally
a StatsProvider.  This is a central place for code which wishes to offer
stats up for monitoring to report them to, either by calling
stats_provider.count('stat.name', value) to increment a counter, or by
registering a class as a stats producer with sp.register_producer(obj).

The StatsProvider connects to the StatsGatherer server and provides its
provider upon startup.  The StatsGatherer is then responsible for polling
the attached providers periodically to retrieve the data provided.
The provider queries each registered producer when the gatherer queries
the provider.  Both the internal 'counters' and the queried 'stats' are
then reported to the gatherer.

This provides a simple gatherer app, (c.f. make stats-gatherer-run)
which prints its furl and listens for incoming connections.  Once a
minute, the gatherer polls all connected providers, and writes the
retrieved data into a pickle file.

Also included is a munin plugin which knows how to read the gatherer's
stats.pickle and output data munin can interpret.  this plugin,
tahoe-stats.py can be symlinked as multiple different names within
munin's 'plugins' directory, and inspects argv to determine which
data to display, doing a lookup in a table within that file.
It looks in the environment for 'statsfile' to determine the path to
the gatherer's stats.pickle.  An example plugins-conf.d file is
provided.

Makefile
misc/munin/tahoe-stats.plugin-conf [new file with mode: 0644]
misc/munin/tahoe-stats.py [new file with mode: 0644]
src/allmydata/client.py
src/allmydata/interfaces.py
src/allmydata/stats.py [new file with mode: 0644]
src/allmydata/storage.py

index 8600bbd0933cb6179f36a80123d6d8ad4ba8bc48..7150a1b59f49b8ecb72b18714a36a3ff3850ff61 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -440,3 +440,8 @@ mac-cleanup:
 mac-dbg:
        cd mac && $(PP) $(PYTHON)w allmydata_tahoe.py
 
+# This target runs a stats gatherer server
+.PHONY: stats-gatherer-run
+stats-gatherer-run:
+       cd stats_gatherer && $(PP) $(PYTHON) ../src/allmydata/stats.py
+
diff --git a/misc/munin/tahoe-stats.plugin-conf b/misc/munin/tahoe-stats.plugin-conf
new file mode 100644 (file)
index 0000000..2084c65
--- /dev/null
@@ -0,0 +1,12 @@
+[tahoe_storage_allocated]
+env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle
+[tahoe_storage_consumed]
+env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle
+[tahoe_runtime_load_avg]
+env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle
+[tahoe_runtime_load_peak]
+env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle
+[tahoe_storage_bytes_added]
+env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle
+[tahoe_storage_bytes_freed]
+env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle
diff --git a/misc/munin/tahoe-stats.py b/misc/munin/tahoe-stats.py
new file mode 100644 (file)
index 0000000..1f09b3c
--- /dev/null
@@ -0,0 +1,145 @@
+#!/usr/bin/python
+
+import os
+import pickle
+import re
+import sys
+
+PLUGINS = {
+    'tahoe_storage_consumed':
+        { 'statid': 'storage_server.consumed',
+          'category': 'stats',
+          'configheader': '\n'.join(['graph_title Tahoe Storage Server Space Consumed',
+                                     'graph_vlabel bytes',
+                                     'graph_category tahoe_storage_server',
+                                     'graph_info This graph shows space consumed',
+                                     'graph_args --base 1024',
+                                     ]),
+          'graph_config': '\n'.join(['%(name)s.label %(name)s',
+                                     '%(name)s.draw LINE1',
+                                     ]),
+          'graph_render': '\n'.join(['%(name)s.value %(value)s',
+                                     ]),
+        },
+    'tahoe_storage_allocated':
+        { 'statid': 'storage_server.allocated',
+          'category': 'stats',
+          'configheader': '\n'.join(['graph_title Tahoe Storage Server Space Allocated',
+                                     'graph_vlabel bytes',
+                                     'graph_category tahoe_storage_server',
+                                     'graph_info This graph shows space allocated',
+                                     'graph_args --base 1024',
+                                     ]),
+          'graph_config': '\n'.join(['%(name)s.label %(name)s',
+                                     '%(name)s.draw LINE1',
+                                     ]),
+          'graph_render': '\n'.join(['%(name)s.value %(value)s',
+                                     ]),
+        },
+
+    'tahoe_runtime_load_avg':
+        { 'statid': 'load_monitor.avg_load',
+          'category': 'stats',
+          'configheader': '\n'.join(['graph_title Tahoe Runtime Load Average',
+                                     'graph_vlabel load',
+                                     'graph_category tahoe',
+                                     'graph_info This graph shows average reactor delay',
+                                     ]),
+          'graph_config': '\n'.join(['%(name)s.label %(name)s',
+                                     '%(name)s.draw LINE1',
+                                     ]),
+          'graph_render': '\n'.join(['%(name)s.value %(value)s',
+                                     ]),
+        },
+    'tahoe_runtime_load_peak':
+        { 'statid': 'load_monitor.max_load',
+          'category': 'stats',
+          'configheader': '\n'.join(['graph_title Tahoe Runtime Load Peak',
+                                     'graph_vlabel load',
+                                     'graph_category tahoe',
+                                     'graph_info This graph shows peak reactor delay',
+                                     ]),
+          'graph_config': '\n'.join(['%(name)s.label %(name)s',
+                                     '%(name)s.draw LINE1',
+                                     ]),
+          'graph_render': '\n'.join(['%(name)s.value %(value)s',
+                                     ]),
+        },
+
+    'tahoe_storage_bytes_added':
+        { 'statid': 'storage_server.bytes_added',
+          'category': 'counters',
+          'configheader': '\n'.join(['graph_title Tahoe Storage Server Bytes Added',
+                                     'graph_vlabel bytes',
+                                     'graph_category tahoe_storage_server',
+                                     'graph_info This graph shows cummulative bytes added',
+                                     ]),
+          'graph_config': '\n'.join(['%(name)s.label %(name)s',
+                                     '%(name)s.draw LINE1',
+                                     ]),
+          'graph_render': '\n'.join(['%(name)s.value %(value)s',
+                                     ]),
+        },
+    'tahoe_storage_bytes_freed':
+        { 'statid': 'storage_server.bytes_freed',
+          'category': 'counters',
+          'configheader': '\n'.join(['graph_title Tahoe Storage Server Bytes Removed',
+                                     'graph_vlabel bytes',
+                                     'graph_category tahoe_storage_server',
+                                     'graph_info This graph shows cummulative bytes removed',
+                                     ]),
+          'graph_config': '\n'.join(['%(name)s.label %(name)s',
+                                     '%(name)s.draw LINE1',
+                                     ]),
+          'graph_render': '\n'.join(['%(name)s.value %(value)s',
+                                     ]),
+        },
+
+    }
+
+def smash_name(name):
+    return re.sub('[^a-zA-Z0-9]', '_', name)
+
+def open_stats(fname):
+    f = open(fname, 'rb')
+    stats = pickle.load(f)
+    f.close()
+    return stats
+
+def main(argv):
+    graph_name = os.path.basename(argv[0])
+    if graph_name.endswith('.py'):
+        graph_name = graph_name[:-3]
+
+    plugin_conf = PLUGINS.get(graph_name)
+
+    for k,v in os.environ.items():
+        if k.startswith('statsfile'):
+            stats_file = v
+            break
+    else:
+        raise RuntimeError("No 'statsfile' env var found")
+
+    stats = open_stats(stats_file)
+
+    def output_nodes(output_section):
+        for tubid, nodestats in stats.items():
+            name = smash_name("%s_%s" % (nodestats['nickname'], tubid[:8]))
+            #value = nodestats['stats'][plugin_conf['category']].get(plugin_conf['statid'])
+            category = plugin_conf['category']
+            statid = plugin_conf['statid']
+            value = nodestats['stats'][category].get(statid)
+            if value is not None:
+                args = { 'name': name, 'value': value }
+                print plugin_conf[output_section] % args
+
+    if len(argv) > 1:
+        if sys.argv[1] == 'config':
+            print plugin_conf['configheader']
+            output_nodes('graph_config')
+            sys.exit(0)
+
+    output_nodes('graph_render')
+
+if __name__ == '__main__':
+    main(sys.argv)
index 2ef50cce53c0dcef6df41c9c54f6342e0d154348..65a49a58785504da36c1d9c6813a80322aedf3c8 100644 (file)
@@ -21,6 +21,7 @@ from allmydata.util import hashutil, idlib, testutil
 from allmydata.filenode import FileNode
 from allmydata.dirnode import NewDirectoryNode
 from allmydata.mutable import MutableFileNode
+from allmydata.stats import StatsProvider
 from allmydata.interfaces import IURI, INewDirectoryURI, \
      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI
 
@@ -56,6 +57,7 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
         self.logSource="Client"
         self.my_furl = None
         self.introducer_client = None
+        self.init_stats_provider()
         self.init_lease_secret()
         self.init_storage()
         self.init_options()
@@ -79,6 +81,15 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
         if webport:
             self.init_web(webport) # strports string
 
+    def init_stats_provider(self):
+        gatherer_furl = self.get_config('stats_gatherer.furl')
+        if gatherer_furl:
+            nickname = self.get_config('nickname')
+            self.stats_provider = StatsProvider(self.tub, nickname, gatherer_furl)
+            self.add_service(self.stats_provider)
+        else:
+            self.stats_provider = None
+
     def init_lease_secret(self):
         def make_secret():
             return idlib.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
@@ -106,7 +117,7 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
                               }[suffix]
                 sizelimit = int(number) * multiplier
         no_storage = self.get_config("debug_no_storage") is not None
-        self.add_service(StorageServer(storedir, sizelimit, no_storage))
+        self.add_service(StorageServer(storedir, sizelimit, no_storage, self.stats_provider))
 
     def init_options(self):
         self.push_to_ourselves = None
index 604b59617e4202bfd62db1f9072131b2a93ddb0f..0ea9d3bdd16b18ae52107119f331822a5e53fb92 100644 (file)
@@ -1309,3 +1309,35 @@ class RIHelper(RemoteInterface):
         will finish and return the upload results.
         """
         return (UploadResults, ChoiceOf(RICHKUploadHelper, None))
+
+
+class RIStatsProvider(RemoteInterface):
+    __remote_name__ = "RIStatsProvider.tahoe.allmydata.com"
+    """
+    Provides access to statistics and monitoring information.
+    """
+
+    def get_stats():
+        """
+        returns a dictionary containing 'counters' and 'stats', each a dictionary
+        with string counter/stat name keys, and numeric values.  counters are
+        monotonically increasing measures of work done, and stats are instantaneous
+        measures (potentially time averaged internally)
+        """
+        return DictOf(str, DictOf(str, ChoiceOf(float, int, long)))
+
+class RIStatsGatherer(RemoteInterface):
+    __remote_name__ = "RIStatsGatherer.tahoe.allmydata.com"
+    """
+    Provides a monitoring service for centralised collection of stats
+    """
+
+    def provide(provider=RIStatsProvider, nickname=str):
+        """
+        @param provider: a stats collector instance which should be polled
+                         periodically by the gatherer to collect stats.
+        @param nickname: a name useful to identify the provided client
+        """
+        return None
+
+
diff --git a/src/allmydata/stats.py b/src/allmydata/stats.py
new file mode 100644 (file)
index 0000000..fd6d4b2
--- /dev/null
@@ -0,0 +1,230 @@
+
+import os
+import pickle
+import pprint
+import sys
+import time
+from collections import deque
+
+from twisted.internet import reactor, defer
+from twisted.application import service
+from twisted.application.internet import TimerService
+from zope.interface import implements
+import foolscap
+from foolscap.logging.gatherer import get_local_ip_for
+
+from allmydata.util import log
+from allmydata.interfaces import RIStatsProvider, RIStatsGatherer
+
+class LoadMonitor(service.MultiService):
+    loop_interval = 1
+    num_samples = 60
+
+    def __init__(self, provider, warn_if_delay_exceeds=1):
+        service.MultiService.__init__(self)
+        self.provider = provider
+        self.warn_if_delay_exceeds = warn_if_delay_exceeds
+        self.running = False
+        self.last = None
+        self.stats = deque()
+
+    def startService(self):
+        if not self.running:
+            self.running = True
+            reactor.callLater(self.loop_interval, self.loop)
+        service.MultiService.startService(self)
+
+    def stopService(self):
+        self.running = False
+
+    def loop(self):
+        if not self.running:
+            return
+        now = time.time()
+        if self.last is not None:
+            delay = now - self.last - self.loop_interval
+            if delay > self.warn_if_delay_exceeds:
+                log.msg(format='excessive reactor delay (%ss)', args=(delay,),
+                        level=log.UNUSUAL)
+            self.stats.append(delay)
+            while len(self.stats) > self.num_samples:
+                self.stats.popleft()
+
+        self.last = now
+        reactor.callLater(self.loop_interval, self.loop)
+
+    def get_stats(self):
+        if self.stats:
+            avg = sum(self.stats) / len(self.stats)
+            m_x = max(self.stats)
+        else:
+            avg = m_x = 0
+        return { 'load_monitor.avg_load': avg,
+                 'load_monitor.max_load': m_x, }
+
+class StatsProvider(foolscap.Referenceable, service.MultiService):
+    implements(RIStatsProvider)
+
+    def __init__(self, tub, nickname, gatherer_furl):
+        service.MultiService.__init__(self)
+        self.gatherer_furl = gatherer_furl
+
+        self.counters = {}
+        self.stats_producers = []
+
+        self.load_monitor = LoadMonitor(self)
+        self.load_monitor.setServiceParent(self)
+        self.register_producer(self.load_monitor)
+
+        if tub:
+            tub.connectTo(gatherer_furl, self._connected_to_gatherer, nickname)
+
+    def count(self, name, delta):
+        val = self.counters.setdefault(name, 0)
+        self.counters[name] = val + delta
+
+    def register_producer(self, stats_producer):
+        self.stats_producers.append(stats_producer)
+
+    def remote_get_stats(self):
+        stats = {}
+        for sp in self.stats_producers:
+            stats.update(sp.get_stats())
+        return { 'counters': self.counters, 'stats': stats }
+
+    def _connected_to_gatherer(self, gatherer, nickname):
+        gatherer.callRemote('provide', self, nickname or '')
+
+class StatsGatherer(foolscap.Referenceable, service.MultiService):
+    implements(RIStatsGatherer)
+
+    poll_interval = 60
+
+    def __init__(self, tub):
+        service.MultiService.__init__(self)
+        self.tub = tub
+
+        self.clients = {}
+        self.nicknames = {}
+
+    def startService(self):
+        self.timer = TimerService(self.poll_interval, self.poll)
+        self.timer.setServiceParent(self)
+        service.MultiService.startService(self)
+
+    def get_furl(self):
+        return self.tub.registerReference(self, furlFile='stats_gatherer.furl')
+
+    def get_tubid(self, rref):
+        return foolscap.SturdyRef(rref.tracker.getURL()).getTubRef().getTubID()
+
+    def remote_provide(self, provider, nickname):
+        tubid = self.get_tubid(provider)
+        self.clients[tubid] = provider
+        self.nicknames[tubid] = nickname
+        provider.notifyOnDisconnect(self.lost_client, tubid)
+
+    def lost_client(self, tubid):
+        del self.clients[tubid]
+        del self.nicknames[tubid]
+
+    def poll(self):
+        for tubid,client in self.clients.items():
+            nickname = self.nicknames.get(tubid)
+            d = client.callRemote('get_stats')
+            d.addCallback(self.got_stats, tubid, nickname)
+
+    def got_stats(self, stats, tubid, nickname):
+        raise NotImplementedError()
+
+class StdOutStatsGatherer(StatsGatherer):
+    def remote_provide(self, provider, nickname):
+        tubid = self.get_tubid(provider)
+        print 'connect "%s" [%s]' % (nickname, tubid)
+        StatsGatherer.remote_provide(self, provider, nickname)
+
+    def lost_client(self, tubid):
+        print 'disconnect "%s" [%s]:' % (self.nicknames[tubid], tubid)
+        StatsGatherer.lost_client(self, tubid)
+
+    def got_stats(self, stats, tubid, nickname):
+        print '"%s" [%s]:' % (nickname, tubid)
+        pprint.pprint(stats)
+
+class PickleStatsGatherer(StdOutStatsGatherer): # for connect/disconnect notifications;
+#class PickleStatsGatherer(StatsGatherer):
+    def __init__(self, tub, picklefile):
+        StatsGatherer.__init__(self, tub)
+        self.picklefile = picklefile
+
+        if os.path.exists(picklefile):
+            f = open(picklefile, 'rb')
+            self.gathered_stats = pickle.load(f)
+            f.close()
+        else:
+            self.gathered_stats = {}
+
+    def got_stats(self, stats, tubid, nickname):
+        s = self.gathered_stats.setdefault(tubid, {})
+        s['timestamp'] = time.time()
+        s['nickname'] = nickname
+        s['stats'] = stats
+        self.dump_pickle()
+
+    def dump_pickle(self):
+        tmp = "%s.tmp" % (self.picklefile,)
+        f = open(tmp, 'wb')
+        pickle.dump(self.gathered_stats, f)
+        f.close()
+        if os.path.exists(self.picklefile):
+            os.unlink(self.picklefile)
+        os.rename(tmp, self.picklefile)
+
+class GathererApp(object):
+    def __init__(self):
+        d = self.setup_tub()
+        d.addCallback(self._tub_ready)
+
+    def setup_tub(self):
+        self._tub = foolscap.Tub(certFile="stats_gatherer.pem")
+        self._tub.setOption("logLocalFailures", True)
+        self._tub.setOption("logRemoteFailures", True)
+        self._tub.startService()
+        portnumfile = "portnum"
+        try:
+            portnum = int(open(portnumfile, "r").read())
+        except (EnvironmentError, ValueError):
+            portnum = 0
+        self._tub.listenOn("tcp:%d" % portnum)
+        d = defer.maybeDeferred(get_local_ip_for)
+        d.addCallback(self._set_location)
+        d.addCallback(lambda res: self._tub)
+        return d
+
+    def _set_location(self, local_address):
+        if local_address is None:
+            local_addresses = ["127.0.0.1"]
+        else:
+            local_addresses = [local_address, "127.0.0.1"]
+        l = self._tub.getListeners()[0]
+        portnum = l.getPortnum()
+        portnumfile = "portnum"
+        open(portnumfile, "w").write("%d\n" % portnum)
+        local_addresses = [ "%s:%d" % (addr, portnum,)
+                            for addr in local_addresses ]
+        assert len(local_addresses) >= 1
+        location = ",".join(local_addresses)
+        self._tub.setLocation(location)
+
+    def _tub_ready(self, tub):
+        sg = PickleStatsGatherer(tub, 'stats.pickle')
+        sg.setServiceParent(tub)
+        sg.verbose = True
+        print '\nStatsGatherer: %s\n' % (sg.get_furl(),)
+
+def main(argv):
+    ga = GathererApp()
+    reactor.run()
+
+if __name__ == '__main__':
+    main(sys.argv)
index 487d63af9df845d9d9a59a339990eb5f70b09ccb..b0be6b1be96dc875d1687e1a6bc7636624d00fc5 100644 (file)
@@ -659,7 +659,7 @@ class StorageServer(service.MultiService, Referenceable):
     implements(RIStorageServer)
     name = 'storageserver'
 
-    def __init__(self, storedir, sizelimit=None, no_storage=False):
+    def __init__(self, storedir, sizelimit=None, no_storage=False, stats_provider=None):
         service.MultiService.__init__(self)
         self.storedir = storedir
         sharedir = os.path.join(storedir, "shares")
@@ -667,6 +667,9 @@ class StorageServer(service.MultiService, Referenceable):
         self.sharedir = sharedir
         self.sizelimit = sizelimit
         self.no_storage = no_storage
+        self.stats_provider = stats_provider
+        if self.stats_provider:
+            self.stats_provider.register_producer(self)
         self.incomingdir = os.path.join(sharedir, 'incoming')
         self._clean_incomplete()
         fileutil.make_dirs(self.incomingdir)
@@ -693,6 +696,11 @@ class StorageServer(service.MultiService, Referenceable):
     def _clean_incomplete(self):
         fileutil.rm_dir(self.incomingdir)
 
+    def get_stats(self):
+        return { 'storage_server.consumed': self.consumed,
+                 'storage_server.allocated': self.allocated_size(),
+               }
+
     def measure_size(self):
         self.consumed = fileutil.du(self.sharedir)
 
@@ -821,11 +829,15 @@ class StorageServer(service.MultiService, Referenceable):
         if not remaining_files:
             fileutil.rm_dir(storagedir)
         self.consumed -= total_space_freed
+        if self.stats_provider:
+            self.stats_provider.count('storage_server.bytes_freed', total_space_freed)
         if not found_buckets:
             raise IndexError("no such lease to cancel")
 
     def bucket_writer_closed(self, bw, consumed_size):
         self.consumed += consumed_size
+        if self.stats_provider:
+            self.stats_provider.count('storage_server.bytes_added', consumed_size)
         del self._active_writers[bw]
 
     def _get_bucket_shares(self, storage_index):