From: robk-tahoe Date: Thu, 31 Jan 2008 03:11:07 +0000 (-0700) Subject: stats: add a simple stats gathering system X-Git-Url: https://git.rkrishnan.org/specifications/components/frontends/CLI.txt?a=commitdiff_plain;h=7b9f3207d0447cf2a371d8a72ea3ed5ee167e9a9;p=tahoe-lafs%2Ftahoe-lafs.git stats: add a simple stats gathering system We have a desire to collect runtime statistics from multiple nodes primarily for server monitoring purposes. This implements a simple implementation of such a system, as a skeleton to build more sophistication upon. Each client now looks for a 'stats_gatherer.furl' config file. If it has been configured to use a stats gatherer, then it instantiates internally a StatsProvider. This is a central place for code which wishes to offer stats up for monitoring to report them to, either by calling stats_provider.count('stat.name', value) to increment a counter, or by registering a class as a stats producer with sp.register_producer(obj). The StatsProvider connects to the StatsGatherer server and provides its provider upon startup. The StatsGatherer is then responsible for polling the attached providers periodically to retrieve the data provided. The provider queries each registered producer when the gatherer queries the provider. Both the internal 'counters' and the queried 'stats' are then reported to the gatherer. This provides a simple gatherer app, (c.f. make stats-gatherer-run) which prints its furl and listens for incoming connections. Once a minute, the gatherer polls all connected providers, and writes the retrieved data into a pickle file. Also included is a munin plugin which knows how to read the gatherer's stats.pickle and output data munin can interpret. this plugin, tahoe-stats.py can be symlinked as multiple different names within munin's 'plugins' directory, and inspects argv to determine which data to display, doing a lookup in a table within that file. It looks in the environment for 'statsfile' to determine the path to the gatherer's stats.pickle. An example plugins-conf.d file is provided. --- diff --git a/Makefile b/Makefile index 8600bbd0..7150a1b5 100644 --- a/Makefile +++ b/Makefile @@ -440,3 +440,8 @@ mac-cleanup: mac-dbg: cd mac && $(PP) $(PYTHON)w allmydata_tahoe.py +# This target runs a stats gatherer server +.PHONY: stats-gatherer-run +stats-gatherer-run: + cd stats_gatherer && $(PP) $(PYTHON) ../src/allmydata/stats.py + diff --git a/misc/munin/tahoe-stats.plugin-conf b/misc/munin/tahoe-stats.plugin-conf new file mode 100644 index 00000000..2084c65f --- /dev/null +++ b/misc/munin/tahoe-stats.plugin-conf @@ -0,0 +1,12 @@ +[tahoe_storage_allocated] +env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle +[tahoe_storage_consumed] +env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle +[tahoe_runtime_load_avg] +env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle +[tahoe_runtime_load_peak] +env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle +[tahoe_storage_bytes_added] +env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle +[tahoe_storage_bytes_freed] +env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle diff --git a/misc/munin/tahoe-stats.py b/misc/munin/tahoe-stats.py new file mode 100644 index 00000000..1f09b3c3 --- /dev/null +++ b/misc/munin/tahoe-stats.py @@ -0,0 +1,145 @@ +#!/usr/bin/python + +import os +import pickle +import re +import sys + +PLUGINS = { + 'tahoe_storage_consumed': + { 'statid': 'storage_server.consumed', + 'category': 'stats', + 'configheader': '\n'.join(['graph_title Tahoe Storage Server Space Consumed', + 'graph_vlabel bytes', + 'graph_category tahoe_storage_server', + 'graph_info This graph shows space consumed', + 'graph_args --base 1024', + ]), + 'graph_config': '\n'.join(['%(name)s.label %(name)s', + '%(name)s.draw LINE1', + ]), + 'graph_render': '\n'.join(['%(name)s.value %(value)s', + ]), + }, + 'tahoe_storage_allocated': + { 'statid': 'storage_server.allocated', + 'category': 'stats', + 'configheader': '\n'.join(['graph_title Tahoe Storage Server Space Allocated', + 'graph_vlabel bytes', + 'graph_category tahoe_storage_server', + 'graph_info This graph shows space allocated', + 'graph_args --base 1024', + ]), + 'graph_config': '\n'.join(['%(name)s.label %(name)s', + '%(name)s.draw LINE1', + ]), + 'graph_render': '\n'.join(['%(name)s.value %(value)s', + ]), + }, + + 'tahoe_runtime_load_avg': + { 'statid': 'load_monitor.avg_load', + 'category': 'stats', + 'configheader': '\n'.join(['graph_title Tahoe Runtime Load Average', + 'graph_vlabel load', + 'graph_category tahoe', + 'graph_info This graph shows average reactor delay', + ]), + 'graph_config': '\n'.join(['%(name)s.label %(name)s', + '%(name)s.draw LINE1', + ]), + 'graph_render': '\n'.join(['%(name)s.value %(value)s', + ]), + }, + 'tahoe_runtime_load_peak': + { 'statid': 'load_monitor.max_load', + 'category': 'stats', + 'configheader': '\n'.join(['graph_title Tahoe Runtime Load Peak', + 'graph_vlabel load', + 'graph_category tahoe', + 'graph_info This graph shows peak reactor delay', + ]), + 'graph_config': '\n'.join(['%(name)s.label %(name)s', + '%(name)s.draw LINE1', + ]), + 'graph_render': '\n'.join(['%(name)s.value %(value)s', + ]), + }, + + 'tahoe_storage_bytes_added': + { 'statid': 'storage_server.bytes_added', + 'category': 'counters', + 'configheader': '\n'.join(['graph_title Tahoe Storage Server Bytes Added', + 'graph_vlabel bytes', + 'graph_category tahoe_storage_server', + 'graph_info This graph shows cummulative bytes added', + ]), + 'graph_config': '\n'.join(['%(name)s.label %(name)s', + '%(name)s.draw LINE1', + ]), + 'graph_render': '\n'.join(['%(name)s.value %(value)s', + ]), + }, + 'tahoe_storage_bytes_freed': + { 'statid': 'storage_server.bytes_freed', + 'category': 'counters', + 'configheader': '\n'.join(['graph_title Tahoe Storage Server Bytes Removed', + 'graph_vlabel bytes', + 'graph_category tahoe_storage_server', + 'graph_info This graph shows cummulative bytes removed', + ]), + 'graph_config': '\n'.join(['%(name)s.label %(name)s', + '%(name)s.draw LINE1', + ]), + 'graph_render': '\n'.join(['%(name)s.value %(value)s', + ]), + }, + + } + +def smash_name(name): + return re.sub('[^a-zA-Z0-9]', '_', name) + +def open_stats(fname): + f = open(fname, 'rb') + stats = pickle.load(f) + f.close() + return stats + +def main(argv): + graph_name = os.path.basename(argv[0]) + if graph_name.endswith('.py'): + graph_name = graph_name[:-3] + + plugin_conf = PLUGINS.get(graph_name) + + for k,v in os.environ.items(): + if k.startswith('statsfile'): + stats_file = v + break + else: + raise RuntimeError("No 'statsfile' env var found") + + stats = open_stats(stats_file) + + def output_nodes(output_section): + for tubid, nodestats in stats.items(): + name = smash_name("%s_%s" % (nodestats['nickname'], tubid[:8])) + #value = nodestats['stats'][plugin_conf['category']].get(plugin_conf['statid']) + category = plugin_conf['category'] + statid = plugin_conf['statid'] + value = nodestats['stats'][category].get(statid) + if value is not None: + args = { 'name': name, 'value': value } + print plugin_conf[output_section] % args + + if len(argv) > 1: + if sys.argv[1] == 'config': + print plugin_conf['configheader'] + output_nodes('graph_config') + sys.exit(0) + + output_nodes('graph_render') + +if __name__ == '__main__': + main(sys.argv) diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 2ef50cce..65a49a58 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -21,6 +21,7 @@ from allmydata.util import hashutil, idlib, testutil from allmydata.filenode import FileNode from allmydata.dirnode import NewDirectoryNode from allmydata.mutable import MutableFileNode +from allmydata.stats import StatsProvider from allmydata.interfaces import IURI, INewDirectoryURI, \ IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI @@ -56,6 +57,7 @@ class Client(node.Node, Referenceable, testutil.PollMixin): self.logSource="Client" self.my_furl = None self.introducer_client = None + self.init_stats_provider() self.init_lease_secret() self.init_storage() self.init_options() @@ -79,6 +81,15 @@ class Client(node.Node, Referenceable, testutil.PollMixin): if webport: self.init_web(webport) # strports string + def init_stats_provider(self): + gatherer_furl = self.get_config('stats_gatherer.furl') + if gatherer_furl: + nickname = self.get_config('nickname') + self.stats_provider = StatsProvider(self.tub, nickname, gatherer_furl) + self.add_service(self.stats_provider) + else: + self.stats_provider = None + def init_lease_secret(self): def make_secret(): return idlib.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n" @@ -106,7 +117,7 @@ class Client(node.Node, Referenceable, testutil.PollMixin): }[suffix] sizelimit = int(number) * multiplier no_storage = self.get_config("debug_no_storage") is not None - self.add_service(StorageServer(storedir, sizelimit, no_storage)) + self.add_service(StorageServer(storedir, sizelimit, no_storage, self.stats_provider)) def init_options(self): self.push_to_ourselves = None diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 604b5961..0ea9d3bd 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -1309,3 +1309,35 @@ class RIHelper(RemoteInterface): will finish and return the upload results. """ return (UploadResults, ChoiceOf(RICHKUploadHelper, None)) + + +class RIStatsProvider(RemoteInterface): + __remote_name__ = "RIStatsProvider.tahoe.allmydata.com" + """ + Provides access to statistics and monitoring information. + """ + + def get_stats(): + """ + returns a dictionary containing 'counters' and 'stats', each a dictionary + with string counter/stat name keys, and numeric values. counters are + monotonically increasing measures of work done, and stats are instantaneous + measures (potentially time averaged internally) + """ + return DictOf(str, DictOf(str, ChoiceOf(float, int, long))) + +class RIStatsGatherer(RemoteInterface): + __remote_name__ = "RIStatsGatherer.tahoe.allmydata.com" + """ + Provides a monitoring service for centralised collection of stats + """ + + def provide(provider=RIStatsProvider, nickname=str): + """ + @param provider: a stats collector instance which should be polled + periodically by the gatherer to collect stats. + @param nickname: a name useful to identify the provided client + """ + return None + + diff --git a/src/allmydata/stats.py b/src/allmydata/stats.py new file mode 100644 index 00000000..fd6d4b27 --- /dev/null +++ b/src/allmydata/stats.py @@ -0,0 +1,230 @@ + +import os +import pickle +import pprint +import sys +import time +from collections import deque + +from twisted.internet import reactor, defer +from twisted.application import service +from twisted.application.internet import TimerService +from zope.interface import implements +import foolscap +from foolscap.logging.gatherer import get_local_ip_for + +from allmydata.util import log +from allmydata.interfaces import RIStatsProvider, RIStatsGatherer + +class LoadMonitor(service.MultiService): + loop_interval = 1 + num_samples = 60 + + def __init__(self, provider, warn_if_delay_exceeds=1): + service.MultiService.__init__(self) + self.provider = provider + self.warn_if_delay_exceeds = warn_if_delay_exceeds + self.running = False + self.last = None + self.stats = deque() + + def startService(self): + if not self.running: + self.running = True + reactor.callLater(self.loop_interval, self.loop) + service.MultiService.startService(self) + + def stopService(self): + self.running = False + + def loop(self): + if not self.running: + return + now = time.time() + if self.last is not None: + delay = now - self.last - self.loop_interval + if delay > self.warn_if_delay_exceeds: + log.msg(format='excessive reactor delay (%ss)', args=(delay,), + level=log.UNUSUAL) + self.stats.append(delay) + while len(self.stats) > self.num_samples: + self.stats.popleft() + + self.last = now + reactor.callLater(self.loop_interval, self.loop) + + def get_stats(self): + if self.stats: + avg = sum(self.stats) / len(self.stats) + m_x = max(self.stats) + else: + avg = m_x = 0 + return { 'load_monitor.avg_load': avg, + 'load_monitor.max_load': m_x, } + +class StatsProvider(foolscap.Referenceable, service.MultiService): + implements(RIStatsProvider) + + def __init__(self, tub, nickname, gatherer_furl): + service.MultiService.__init__(self) + self.gatherer_furl = gatherer_furl + + self.counters = {} + self.stats_producers = [] + + self.load_monitor = LoadMonitor(self) + self.load_monitor.setServiceParent(self) + self.register_producer(self.load_monitor) + + if tub: + tub.connectTo(gatherer_furl, self._connected_to_gatherer, nickname) + + def count(self, name, delta): + val = self.counters.setdefault(name, 0) + self.counters[name] = val + delta + + def register_producer(self, stats_producer): + self.stats_producers.append(stats_producer) + + def remote_get_stats(self): + stats = {} + for sp in self.stats_producers: + stats.update(sp.get_stats()) + return { 'counters': self.counters, 'stats': stats } + + def _connected_to_gatherer(self, gatherer, nickname): + gatherer.callRemote('provide', self, nickname or '') + +class StatsGatherer(foolscap.Referenceable, service.MultiService): + implements(RIStatsGatherer) + + poll_interval = 60 + + def __init__(self, tub): + service.MultiService.__init__(self) + self.tub = tub + + self.clients = {} + self.nicknames = {} + + def startService(self): + self.timer = TimerService(self.poll_interval, self.poll) + self.timer.setServiceParent(self) + service.MultiService.startService(self) + + def get_furl(self): + return self.tub.registerReference(self, furlFile='stats_gatherer.furl') + + def get_tubid(self, rref): + return foolscap.SturdyRef(rref.tracker.getURL()).getTubRef().getTubID() + + def remote_provide(self, provider, nickname): + tubid = self.get_tubid(provider) + self.clients[tubid] = provider + self.nicknames[tubid] = nickname + provider.notifyOnDisconnect(self.lost_client, tubid) + + def lost_client(self, tubid): + del self.clients[tubid] + del self.nicknames[tubid] + + def poll(self): + for tubid,client in self.clients.items(): + nickname = self.nicknames.get(tubid) + d = client.callRemote('get_stats') + d.addCallback(self.got_stats, tubid, nickname) + + def got_stats(self, stats, tubid, nickname): + raise NotImplementedError() + +class StdOutStatsGatherer(StatsGatherer): + def remote_provide(self, provider, nickname): + tubid = self.get_tubid(provider) + print 'connect "%s" [%s]' % (nickname, tubid) + StatsGatherer.remote_provide(self, provider, nickname) + + def lost_client(self, tubid): + print 'disconnect "%s" [%s]:' % (self.nicknames[tubid], tubid) + StatsGatherer.lost_client(self, tubid) + + def got_stats(self, stats, tubid, nickname): + print '"%s" [%s]:' % (nickname, tubid) + pprint.pprint(stats) + +class PickleStatsGatherer(StdOutStatsGatherer): # for connect/disconnect notifications; +#class PickleStatsGatherer(StatsGatherer): + def __init__(self, tub, picklefile): + StatsGatherer.__init__(self, tub) + self.picklefile = picklefile + + if os.path.exists(picklefile): + f = open(picklefile, 'rb') + self.gathered_stats = pickle.load(f) + f.close() + else: + self.gathered_stats = {} + + def got_stats(self, stats, tubid, nickname): + s = self.gathered_stats.setdefault(tubid, {}) + s['timestamp'] = time.time() + s['nickname'] = nickname + s['stats'] = stats + self.dump_pickle() + + def dump_pickle(self): + tmp = "%s.tmp" % (self.picklefile,) + f = open(tmp, 'wb') + pickle.dump(self.gathered_stats, f) + f.close() + if os.path.exists(self.picklefile): + os.unlink(self.picklefile) + os.rename(tmp, self.picklefile) + +class GathererApp(object): + def __init__(self): + d = self.setup_tub() + d.addCallback(self._tub_ready) + + def setup_tub(self): + self._tub = foolscap.Tub(certFile="stats_gatherer.pem") + self._tub.setOption("logLocalFailures", True) + self._tub.setOption("logRemoteFailures", True) + self._tub.startService() + portnumfile = "portnum" + try: + portnum = int(open(portnumfile, "r").read()) + except (EnvironmentError, ValueError): + portnum = 0 + self._tub.listenOn("tcp:%d" % portnum) + d = defer.maybeDeferred(get_local_ip_for) + d.addCallback(self._set_location) + d.addCallback(lambda res: self._tub) + return d + + def _set_location(self, local_address): + if local_address is None: + local_addresses = ["127.0.0.1"] + else: + local_addresses = [local_address, "127.0.0.1"] + l = self._tub.getListeners()[0] + portnum = l.getPortnum() + portnumfile = "portnum" + open(portnumfile, "w").write("%d\n" % portnum) + local_addresses = [ "%s:%d" % (addr, portnum,) + for addr in local_addresses ] + assert len(local_addresses) >= 1 + location = ",".join(local_addresses) + self._tub.setLocation(location) + + def _tub_ready(self, tub): + sg = PickleStatsGatherer(tub, 'stats.pickle') + sg.setServiceParent(tub) + sg.verbose = True + print '\nStatsGatherer: %s\n' % (sg.get_furl(),) + +def main(argv): + ga = GathererApp() + reactor.run() + +if __name__ == '__main__': + main(sys.argv) diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py index 487d63af..b0be6b1b 100644 --- a/src/allmydata/storage.py +++ b/src/allmydata/storage.py @@ -659,7 +659,7 @@ class StorageServer(service.MultiService, Referenceable): implements(RIStorageServer) name = 'storageserver' - def __init__(self, storedir, sizelimit=None, no_storage=False): + def __init__(self, storedir, sizelimit=None, no_storage=False, stats_provider=None): service.MultiService.__init__(self) self.storedir = storedir sharedir = os.path.join(storedir, "shares") @@ -667,6 +667,9 @@ class StorageServer(service.MultiService, Referenceable): self.sharedir = sharedir self.sizelimit = sizelimit self.no_storage = no_storage + self.stats_provider = stats_provider + if self.stats_provider: + self.stats_provider.register_producer(self) self.incomingdir = os.path.join(sharedir, 'incoming') self._clean_incomplete() fileutil.make_dirs(self.incomingdir) @@ -693,6 +696,11 @@ class StorageServer(service.MultiService, Referenceable): def _clean_incomplete(self): fileutil.rm_dir(self.incomingdir) + def get_stats(self): + return { 'storage_server.consumed': self.consumed, + 'storage_server.allocated': self.allocated_size(), + } + def measure_size(self): self.consumed = fileutil.du(self.sharedir) @@ -821,11 +829,15 @@ class StorageServer(service.MultiService, Referenceable): if not remaining_files: fileutil.rm_dir(storagedir) self.consumed -= total_space_freed + if self.stats_provider: + self.stats_provider.count('storage_server.bytes_freed', total_space_freed) if not found_buckets: raise IndexError("no such lease to cancel") def bucket_writer_closed(self, bw, consumed_size): self.consumed += consumed_size + if self.stats_provider: + self.stats_provider.count('storage_server.bytes_added', consumed_size) del self._active_writers[bw] def _get_bucket_shares(self, storage_index):