mac-dbg:
cd mac && $(PP) $(PYTHON)w allmydata_tahoe.py
+# This target runs a stats gatherer server
+.PHONY: stats-gatherer-run
+stats-gatherer-run:
+ cd stats_gatherer && $(PP) $(PYTHON) ../src/allmydata/stats.py
+
--- /dev/null
+[tahoe_storage_allocated]
+env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle
+[tahoe_storage_consumed]
+env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle
+[tahoe_runtime_load_avg]
+env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle
+[tahoe_runtime_load_peak]
+env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle
+[tahoe_storage_bytes_added]
+env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle
+[tahoe_storage_bytes_freed]
+env.statsfile /home/robk/trees/tahoe/stats_gatherer/stats.pickle
--- /dev/null
+#!/usr/bin/python
+
+import os
+import pickle
+import re
+import sys
+
+PLUGINS = {
+ 'tahoe_storage_consumed':
+ { 'statid': 'storage_server.consumed',
+ 'category': 'stats',
+ 'configheader': '\n'.join(['graph_title Tahoe Storage Server Space Consumed',
+ 'graph_vlabel bytes',
+ 'graph_category tahoe_storage_server',
+ 'graph_info This graph shows space consumed',
+ 'graph_args --base 1024',
+ ]),
+ 'graph_config': '\n'.join(['%(name)s.label %(name)s',
+ '%(name)s.draw LINE1',
+ ]),
+ 'graph_render': '\n'.join(['%(name)s.value %(value)s',
+ ]),
+ },
+ 'tahoe_storage_allocated':
+ { 'statid': 'storage_server.allocated',
+ 'category': 'stats',
+ 'configheader': '\n'.join(['graph_title Tahoe Storage Server Space Allocated',
+ 'graph_vlabel bytes',
+ 'graph_category tahoe_storage_server',
+ 'graph_info This graph shows space allocated',
+ 'graph_args --base 1024',
+ ]),
+ 'graph_config': '\n'.join(['%(name)s.label %(name)s',
+ '%(name)s.draw LINE1',
+ ]),
+ 'graph_render': '\n'.join(['%(name)s.value %(value)s',
+ ]),
+ },
+
+ 'tahoe_runtime_load_avg':
+ { 'statid': 'load_monitor.avg_load',
+ 'category': 'stats',
+ 'configheader': '\n'.join(['graph_title Tahoe Runtime Load Average',
+ 'graph_vlabel load',
+ 'graph_category tahoe',
+ 'graph_info This graph shows average reactor delay',
+ ]),
+ 'graph_config': '\n'.join(['%(name)s.label %(name)s',
+ '%(name)s.draw LINE1',
+ ]),
+ 'graph_render': '\n'.join(['%(name)s.value %(value)s',
+ ]),
+ },
+ 'tahoe_runtime_load_peak':
+ { 'statid': 'load_monitor.max_load',
+ 'category': 'stats',
+ 'configheader': '\n'.join(['graph_title Tahoe Runtime Load Peak',
+ 'graph_vlabel load',
+ 'graph_category tahoe',
+ 'graph_info This graph shows peak reactor delay',
+ ]),
+ 'graph_config': '\n'.join(['%(name)s.label %(name)s',
+ '%(name)s.draw LINE1',
+ ]),
+ 'graph_render': '\n'.join(['%(name)s.value %(value)s',
+ ]),
+ },
+
+ 'tahoe_storage_bytes_added':
+ { 'statid': 'storage_server.bytes_added',
+ 'category': 'counters',
+ 'configheader': '\n'.join(['graph_title Tahoe Storage Server Bytes Added',
+ 'graph_vlabel bytes',
+ 'graph_category tahoe_storage_server',
+ 'graph_info This graph shows cummulative bytes added',
+ ]),
+ 'graph_config': '\n'.join(['%(name)s.label %(name)s',
+ '%(name)s.draw LINE1',
+ ]),
+ 'graph_render': '\n'.join(['%(name)s.value %(value)s',
+ ]),
+ },
+ 'tahoe_storage_bytes_freed':
+ { 'statid': 'storage_server.bytes_freed',
+ 'category': 'counters',
+ 'configheader': '\n'.join(['graph_title Tahoe Storage Server Bytes Removed',
+ 'graph_vlabel bytes',
+ 'graph_category tahoe_storage_server',
+ 'graph_info This graph shows cummulative bytes removed',
+ ]),
+ 'graph_config': '\n'.join(['%(name)s.label %(name)s',
+ '%(name)s.draw LINE1',
+ ]),
+ 'graph_render': '\n'.join(['%(name)s.value %(value)s',
+ ]),
+ },
+
+ }
+
+def smash_name(name):
+ return re.sub('[^a-zA-Z0-9]', '_', name)
+
+def open_stats(fname):
+ f = open(fname, 'rb')
+ stats = pickle.load(f)
+ f.close()
+ return stats
+
+def main(argv):
+ graph_name = os.path.basename(argv[0])
+ if graph_name.endswith('.py'):
+ graph_name = graph_name[:-3]
+
+ plugin_conf = PLUGINS.get(graph_name)
+
+ for k,v in os.environ.items():
+ if k.startswith('statsfile'):
+ stats_file = v
+ break
+ else:
+ raise RuntimeError("No 'statsfile' env var found")
+
+ stats = open_stats(stats_file)
+
+ def output_nodes(output_section):
+ for tubid, nodestats in stats.items():
+ name = smash_name("%s_%s" % (nodestats['nickname'], tubid[:8]))
+ #value = nodestats['stats'][plugin_conf['category']].get(plugin_conf['statid'])
+ category = plugin_conf['category']
+ statid = plugin_conf['statid']
+ value = nodestats['stats'][category].get(statid)
+ if value is not None:
+ args = { 'name': name, 'value': value }
+ print plugin_conf[output_section] % args
+
+ if len(argv) > 1:
+ if sys.argv[1] == 'config':
+ print plugin_conf['configheader']
+ output_nodes('graph_config')
+ sys.exit(0)
+
+ output_nodes('graph_render')
+
+if __name__ == '__main__':
+ main(sys.argv)
from allmydata.filenode import FileNode
from allmydata.dirnode import NewDirectoryNode
from allmydata.mutable import MutableFileNode
+from allmydata.stats import StatsProvider
from allmydata.interfaces import IURI, INewDirectoryURI, \
IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI
self.logSource="Client"
self.my_furl = None
self.introducer_client = None
+ self.init_stats_provider()
self.init_lease_secret()
self.init_storage()
self.init_options()
if webport:
self.init_web(webport) # strports string
+ def init_stats_provider(self):
+ gatherer_furl = self.get_config('stats_gatherer.furl')
+ if gatherer_furl:
+ nickname = self.get_config('nickname')
+ self.stats_provider = StatsProvider(self.tub, nickname, gatherer_furl)
+ self.add_service(self.stats_provider)
+ else:
+ self.stats_provider = None
+
def init_lease_secret(self):
def make_secret():
return idlib.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
}[suffix]
sizelimit = int(number) * multiplier
no_storage = self.get_config("debug_no_storage") is not None
- self.add_service(StorageServer(storedir, sizelimit, no_storage))
+ self.add_service(StorageServer(storedir, sizelimit, no_storage, self.stats_provider))
def init_options(self):
self.push_to_ourselves = None
will finish and return the upload results.
"""
return (UploadResults, ChoiceOf(RICHKUploadHelper, None))
+
+
+class RIStatsProvider(RemoteInterface):
+ __remote_name__ = "RIStatsProvider.tahoe.allmydata.com"
+ """
+ Provides access to statistics and monitoring information.
+ """
+
+ def get_stats():
+ """
+ returns a dictionary containing 'counters' and 'stats', each a dictionary
+ with string counter/stat name keys, and numeric values. counters are
+ monotonically increasing measures of work done, and stats are instantaneous
+ measures (potentially time averaged internally)
+ """
+ return DictOf(str, DictOf(str, ChoiceOf(float, int, long)))
+
+class RIStatsGatherer(RemoteInterface):
+ __remote_name__ = "RIStatsGatherer.tahoe.allmydata.com"
+ """
+ Provides a monitoring service for centralised collection of stats
+ """
+
+ def provide(provider=RIStatsProvider, nickname=str):
+ """
+ @param provider: a stats collector instance which should be polled
+ periodically by the gatherer to collect stats.
+ @param nickname: a name useful to identify the provided client
+ """
+ return None
+
+
--- /dev/null
+
+import os
+import pickle
+import pprint
+import sys
+import time
+from collections import deque
+
+from twisted.internet import reactor, defer
+from twisted.application import service
+from twisted.application.internet import TimerService
+from zope.interface import implements
+import foolscap
+from foolscap.logging.gatherer import get_local_ip_for
+
+from allmydata.util import log
+from allmydata.interfaces import RIStatsProvider, RIStatsGatherer
+
+class LoadMonitor(service.MultiService):
+ loop_interval = 1
+ num_samples = 60
+
+ def __init__(self, provider, warn_if_delay_exceeds=1):
+ service.MultiService.__init__(self)
+ self.provider = provider
+ self.warn_if_delay_exceeds = warn_if_delay_exceeds
+ self.running = False
+ self.last = None
+ self.stats = deque()
+
+ def startService(self):
+ if not self.running:
+ self.running = True
+ reactor.callLater(self.loop_interval, self.loop)
+ service.MultiService.startService(self)
+
+ def stopService(self):
+ self.running = False
+
+ def loop(self):
+ if not self.running:
+ return
+ now = time.time()
+ if self.last is not None:
+ delay = now - self.last - self.loop_interval
+ if delay > self.warn_if_delay_exceeds:
+ log.msg(format='excessive reactor delay (%ss)', args=(delay,),
+ level=log.UNUSUAL)
+ self.stats.append(delay)
+ while len(self.stats) > self.num_samples:
+ self.stats.popleft()
+
+ self.last = now
+ reactor.callLater(self.loop_interval, self.loop)
+
+ def get_stats(self):
+ if self.stats:
+ avg = sum(self.stats) / len(self.stats)
+ m_x = max(self.stats)
+ else:
+ avg = m_x = 0
+ return { 'load_monitor.avg_load': avg,
+ 'load_monitor.max_load': m_x, }
+
+class StatsProvider(foolscap.Referenceable, service.MultiService):
+ implements(RIStatsProvider)
+
+ def __init__(self, tub, nickname, gatherer_furl):
+ service.MultiService.__init__(self)
+ self.gatherer_furl = gatherer_furl
+
+ self.counters = {}
+ self.stats_producers = []
+
+ self.load_monitor = LoadMonitor(self)
+ self.load_monitor.setServiceParent(self)
+ self.register_producer(self.load_monitor)
+
+ if tub:
+ tub.connectTo(gatherer_furl, self._connected_to_gatherer, nickname)
+
+ def count(self, name, delta):
+ val = self.counters.setdefault(name, 0)
+ self.counters[name] = val + delta
+
+ def register_producer(self, stats_producer):
+ self.stats_producers.append(stats_producer)
+
+ def remote_get_stats(self):
+ stats = {}
+ for sp in self.stats_producers:
+ stats.update(sp.get_stats())
+ return { 'counters': self.counters, 'stats': stats }
+
+ def _connected_to_gatherer(self, gatherer, nickname):
+ gatherer.callRemote('provide', self, nickname or '')
+
+class StatsGatherer(foolscap.Referenceable, service.MultiService):
+ implements(RIStatsGatherer)
+
+ poll_interval = 60
+
+ def __init__(self, tub):
+ service.MultiService.__init__(self)
+ self.tub = tub
+
+ self.clients = {}
+ self.nicknames = {}
+
+ def startService(self):
+ self.timer = TimerService(self.poll_interval, self.poll)
+ self.timer.setServiceParent(self)
+ service.MultiService.startService(self)
+
+ def get_furl(self):
+ return self.tub.registerReference(self, furlFile='stats_gatherer.furl')
+
+ def get_tubid(self, rref):
+ return foolscap.SturdyRef(rref.tracker.getURL()).getTubRef().getTubID()
+
+ def remote_provide(self, provider, nickname):
+ tubid = self.get_tubid(provider)
+ self.clients[tubid] = provider
+ self.nicknames[tubid] = nickname
+ provider.notifyOnDisconnect(self.lost_client, tubid)
+
+ def lost_client(self, tubid):
+ del self.clients[tubid]
+ del self.nicknames[tubid]
+
+ def poll(self):
+ for tubid,client in self.clients.items():
+ nickname = self.nicknames.get(tubid)
+ d = client.callRemote('get_stats')
+ d.addCallback(self.got_stats, tubid, nickname)
+
+ def got_stats(self, stats, tubid, nickname):
+ raise NotImplementedError()
+
+class StdOutStatsGatherer(StatsGatherer):
+ def remote_provide(self, provider, nickname):
+ tubid = self.get_tubid(provider)
+ print 'connect "%s" [%s]' % (nickname, tubid)
+ StatsGatherer.remote_provide(self, provider, nickname)
+
+ def lost_client(self, tubid):
+ print 'disconnect "%s" [%s]:' % (self.nicknames[tubid], tubid)
+ StatsGatherer.lost_client(self, tubid)
+
+ def got_stats(self, stats, tubid, nickname):
+ print '"%s" [%s]:' % (nickname, tubid)
+ pprint.pprint(stats)
+
+class PickleStatsGatherer(StdOutStatsGatherer): # for connect/disconnect notifications;
+#class PickleStatsGatherer(StatsGatherer):
+ def __init__(self, tub, picklefile):
+ StatsGatherer.__init__(self, tub)
+ self.picklefile = picklefile
+
+ if os.path.exists(picklefile):
+ f = open(picklefile, 'rb')
+ self.gathered_stats = pickle.load(f)
+ f.close()
+ else:
+ self.gathered_stats = {}
+
+ def got_stats(self, stats, tubid, nickname):
+ s = self.gathered_stats.setdefault(tubid, {})
+ s['timestamp'] = time.time()
+ s['nickname'] = nickname
+ s['stats'] = stats
+ self.dump_pickle()
+
+ def dump_pickle(self):
+ tmp = "%s.tmp" % (self.picklefile,)
+ f = open(tmp, 'wb')
+ pickle.dump(self.gathered_stats, f)
+ f.close()
+ if os.path.exists(self.picklefile):
+ os.unlink(self.picklefile)
+ os.rename(tmp, self.picklefile)
+
+class GathererApp(object):
+ def __init__(self):
+ d = self.setup_tub()
+ d.addCallback(self._tub_ready)
+
+ def setup_tub(self):
+ self._tub = foolscap.Tub(certFile="stats_gatherer.pem")
+ self._tub.setOption("logLocalFailures", True)
+ self._tub.setOption("logRemoteFailures", True)
+ self._tub.startService()
+ portnumfile = "portnum"
+ try:
+ portnum = int(open(portnumfile, "r").read())
+ except (EnvironmentError, ValueError):
+ portnum = 0
+ self._tub.listenOn("tcp:%d" % portnum)
+ d = defer.maybeDeferred(get_local_ip_for)
+ d.addCallback(self._set_location)
+ d.addCallback(lambda res: self._tub)
+ return d
+
+ def _set_location(self, local_address):
+ if local_address is None:
+ local_addresses = ["127.0.0.1"]
+ else:
+ local_addresses = [local_address, "127.0.0.1"]
+ l = self._tub.getListeners()[0]
+ portnum = l.getPortnum()
+ portnumfile = "portnum"
+ open(portnumfile, "w").write("%d\n" % portnum)
+ local_addresses = [ "%s:%d" % (addr, portnum,)
+ for addr in local_addresses ]
+ assert len(local_addresses) >= 1
+ location = ",".join(local_addresses)
+ self._tub.setLocation(location)
+
+ def _tub_ready(self, tub):
+ sg = PickleStatsGatherer(tub, 'stats.pickle')
+ sg.setServiceParent(tub)
+ sg.verbose = True
+ print '\nStatsGatherer: %s\n' % (sg.get_furl(),)
+
+def main(argv):
+ ga = GathererApp()
+ reactor.run()
+
+if __name__ == '__main__':
+ main(sys.argv)
implements(RIStorageServer)
name = 'storageserver'
- def __init__(self, storedir, sizelimit=None, no_storage=False):
+ def __init__(self, storedir, sizelimit=None, no_storage=False, stats_provider=None):
service.MultiService.__init__(self)
self.storedir = storedir
sharedir = os.path.join(storedir, "shares")
self.sharedir = sharedir
self.sizelimit = sizelimit
self.no_storage = no_storage
+ self.stats_provider = stats_provider
+ if self.stats_provider:
+ self.stats_provider.register_producer(self)
self.incomingdir = os.path.join(sharedir, 'incoming')
self._clean_incomplete()
fileutil.make_dirs(self.incomingdir)
def _clean_incomplete(self):
fileutil.rm_dir(self.incomingdir)
+ def get_stats(self):
+ return { 'storage_server.consumed': self.consumed,
+ 'storage_server.allocated': self.allocated_size(),
+ }
+
def measure_size(self):
self.consumed = fileutil.du(self.sharedir)
if not remaining_files:
fileutil.rm_dir(storagedir)
self.consumed -= total_space_freed
+ if self.stats_provider:
+ self.stats_provider.count('storage_server.bytes_freed', total_space_freed)
if not found_buckets:
raise IndexError("no such lease to cancel")
def bucket_writer_closed(self, bw, consumed_size):
self.consumed += consumed_size
+ if self.stats_provider:
+ self.stats_provider.count('storage_server.bytes_added', consumed_size)
del self._active_writers[bw]
def _get_bucket_shares(self, storage_index):