From 8473a96adad83ba1144a713139803da7dfc9fc56 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Tue, 18 Nov 2008 01:46:20 -0700 Subject: [PATCH] #330: convert stats-gatherer into a .tac file service, add 'tahoe create-stats-gatherer' --- src/allmydata/scripts/runner.py | 5 +- src/allmydata/scripts/stats_gatherer.py | 57 +++++++++++++ src/allmydata/stats.py | 108 +++++++++--------------- src/allmydata/test/common.py | 25 +++--- 4 files changed, 117 insertions(+), 78 deletions(-) create mode 100644 src/allmydata/scripts/stats_gatherer.py diff --git a/src/allmydata/scripts/runner.py b/src/allmydata/scripts/runner.py index fcb5d147..235cc2a0 100644 --- a/src/allmydata/scripts/runner.py +++ b/src/allmydata/scripts/runner.py @@ -8,10 +8,11 @@ from twisted.python import usage pkg_resources.require('allmydata-tahoe') from allmydata.scripts.common import BaseOptions -import debug, create_node, startstop_node, cli, keygen +import debug, create_node, startstop_node, cli, keygen, stats_gatherer _general_commands = ( create_node.subCommands + keygen.subCommands + + stats_gatherer.subCommands + debug.subCommands + cli.subCommands ) @@ -77,6 +78,8 @@ def runner(argv, rc = cli.dispatch[command](so) elif command in keygen.dispatch: rc = keygen.dispatch[command](so, stdout, stderr) + elif command in stats_gatherer.dispatch: + rc = stats_gatherer.dispatch[command](so) elif command in ac_dispatch: rc = ac_dispatch[command](so, stdout, stderr) else: diff --git a/src/allmydata/scripts/stats_gatherer.py b/src/allmydata/scripts/stats_gatherer.py new file mode 100644 index 00000000..c7f8fdd3 --- /dev/null +++ b/src/allmydata/scripts/stats_gatherer.py @@ -0,0 +1,57 @@ + +import os +from twisted.python import usage + +class CreateStatsGathererOptions(usage.Options): + optParameters = [ + ["basedir", "C", None, "which directory to create the stats-gatherer in"], + ] + + def parseArgs(self, basedir=None): + if basedir is not None: + assert self["basedir"] is None + self["basedir"] = basedir + + +stats_gatherer_tac = """ +# -*- python -*- + +from allmydata import stats +from twisted.application import service + +verbose = True +g = stats.StatsGathererService(verbose=verbose) + +application = service.Application('allmydata_stats_gatherer') +g.setServiceParent(application) +""" + +def create_stats_gatherer(config): + out = config.stdout + err = config.stderr + basedir = config['basedir'] + if not basedir: + print >>err, "a basedir was not provided, please use --basedir or -C" + return -1 + if os.path.exists(basedir): + if os.listdir(basedir): + print >>err, "The base directory \"%s\", which is \"%s\" is not empty." % (basedir, os.path.abspath(basedir)) + print >>err, "To avoid clobbering anything, I am going to quit now." + print >>err, "Please use a different directory, or empty this one." + return -1 + # we're willing to use an empty directory + else: + os.mkdir(basedir) + f = open(os.path.join(basedir, "tahoe-stats-gatherer.tac"), "wb") + f.write(stats_gatherer_tac) + f.close() + +subCommands = [ + ["create-stats-gatherer", None, CreateStatsGathererOptions, "Create a stats-gatherer service."], +] + +dispatch = { + "create-stats-gatherer": create_stats_gatherer, + } + + diff --git a/src/allmydata/stats.py b/src/allmydata/stats.py index 0b4af67f..fc9ca9e7 100644 --- a/src/allmydata/stats.py +++ b/src/allmydata/stats.py @@ -2,17 +2,15 @@ import os import pickle import pprint -import sys import time from collections import deque -from twisted.internet import reactor, defer +from twisted.internet import reactor from twisted.application import service from twisted.application.internet import TimerService from zope.interface import implements import foolscap from foolscap.eventual import eventually -from foolscap.logging.gatherer import get_local_ip_for from twisted.internet.error import ConnectionDone, ConnectionLost from foolscap import DeadReferenceError @@ -125,6 +123,7 @@ class CPUUsageMonitor(service.MultiService): s["cpu_monitor.total"] = now_cpu - self.initial_cpu return s + class StatsProvider(foolscap.Referenceable, service.MultiService): implements(RIStatsProvider) @@ -180,32 +179,21 @@ class StatsProvider(foolscap.Referenceable, service.MultiService): def _connected(self, gatherer, nickname): gatherer.callRemoteOnly('provide', self, nickname or '') + class StatsGatherer(foolscap.Referenceable, service.MultiService): implements(RIStatsGatherer) poll_interval = 60 - def __init__(self, tub, basedir): + def __init__(self, basedir): service.MultiService.__init__(self) - self.tub = tub self.basedir = basedir self.clients = {} self.nicknames = {} - def startService(self): - # the Tub must have a location set on it by now - service.MultiService.startService(self) self.timer = TimerService(self.poll_interval, self.poll) self.timer.setServiceParent(self) - self.registerGatherer() - - def get_furl(self): - return self.my_furl - - def registerGatherer(self): - furl_file = os.path.join(self.basedir, "stats_gatherer.furl") - self.my_furl = self.tub.registerReference(self, furlFile=furl_file) def get_tubid(self, rref): return foolscap.SturdyRef(rref.tracker.getURL()).getTubRef().getTubID() @@ -251,7 +239,7 @@ class StdOutStatsGatherer(StatsGatherer): StatsGatherer.remote_provide(self, provider, nickname) def announce_lost_client(self, tubid): - print 'disconnect "%s" [%s]:' % (self.nicknames[tubid], tubid) + print 'disconnect "%s" [%s]' % (self.nicknames[tubid], tubid) def got_stats(self, stats, tubid, nickname): print '"%s" [%s]:' % (nickname, tubid) @@ -260,9 +248,9 @@ class StdOutStatsGatherer(StatsGatherer): class PickleStatsGatherer(StdOutStatsGatherer): # inherit from StdOutStatsGatherer for connect/disconnect notifications - def __init__(self, tub, basedir=".", verbose=True): + def __init__(self, basedir=".", verbose=True): self.verbose = verbose - StatsGatherer.__init__(self, tub, basedir) + StatsGatherer.__init__(self, basedir) self.picklefile = os.path.join(basedir, "stats.pickle") if os.path.exists(self.picklefile): @@ -288,51 +276,39 @@ class PickleStatsGatherer(StdOutStatsGatherer): os.unlink(self.picklefile) os.rename(tmp, self.picklefile) -class GathererApp(object): - def __init__(self): - d = self.setup_tub() - d.addCallback(self._tub_ready) - - def setup_tub(self): - self._tub = foolscap.Tub(certFile="stats_gatherer.pem") - self._tub.setOption("logLocalFailures", True) - self._tub.setOption("logRemoteFailures", True) - self._tub.startService() - portnumfile = "portnum" +class StatsGathererService(service.MultiService): + furl_file = "stats_gatherer.furl" + + def __init__(self, basedir=".", verbose=False): + service.MultiService.__init__(self) + self.basedir = basedir + self.tub = foolscap.Tub(certFile=os.path.join(self.basedir, + "stats_gatherer.pem")) + self.tub.setServiceParent(self) + self.tub.setOption("logLocalFailures", True) + self.tub.setOption("logRemoteFailures", True) + + self.stats_gatherer = PickleStatsGatherer(self.basedir, verbose) + self.stats_gatherer.setServiceParent(self) + + portnumfile = os.path.join(self.basedir, "portnum") try: - portnum = int(open(portnumfile, "r").read()) - except (EnvironmentError, ValueError): - portnum = 0 - self._tub.listenOn("tcp:%d" % portnum) - d = defer.maybeDeferred(get_local_ip_for) - d.addCallback(self._set_location) - d.addCallback(lambda res: self._tub) - return d - - def _set_location(self, local_address): - if local_address is None: - local_addresses = ["127.0.0.1"] - else: - local_addresses = [local_address, "127.0.0.1"] - l = self._tub.getListeners()[0] - portnum = l.getPortnum() - portnumfile = "portnum" - open(portnumfile, "w").write("%d\n" % portnum) - local_addresses = [ "%s:%d" % (addr, portnum,) - for addr in local_addresses ] - assert len(local_addresses) >= 1 - location = ",".join(local_addresses) - self._tub.setLocation(location) - - def _tub_ready(self, tub): - sg = PickleStatsGatherer(tub, ".") - sg.setServiceParent(tub) - sg.verbose = True - print '\nStatsGatherer: %s\n' % (sg.get_furl(),) - -def main(argv): - ga = GathererApp() - reactor.run() - -if __name__ == '__main__': - main(sys.argv) + portnum = open(portnumfile, "r").read() + except EnvironmentError: + portnum = None + self.listener = self.tub.listenOn(portnum or "tcp:0") + d = self.tub.setLocationAutomatically() + if portnum is None: + d.addCallback(self.save_portnum) + d.addCallback(self.tub_ready) + d.addErrback(log.err) + + def save_portnum(self, junk): + portnum = self.listener.getPortnum() + portnumfile = os.path.join(self.basedir, 'portnum') + open(portnumfile, 'wb').write('%d\n' % (portnum,)) + + def tub_ready(self, ignored): + ff = os.path.join(self.basedir, self.furl_file) + self.gatherer_furl = self.tub.registerReference(self.stats_gatherer, + furlFile=ff) diff --git a/src/allmydata/test/common.py b/src/allmydata/test/common.py index e95a59d1..2585034b 100644 --- a/src/allmydata/test/common.py +++ b/src/allmydata/test/common.py @@ -6,7 +6,6 @@ from twisted.internet.interfaces import IConsumer from twisted.python import failure from twisted.application import service from twisted.web.error import Error as WebError -from foolscap import Tub from foolscap.eventual import flushEventualQueue, fireEventually from allmydata import uri, dirnode, client from allmydata.introducer.server import IntroducerNode @@ -17,7 +16,7 @@ from allmydata.checker_results import CheckerResults, CheckAndRepairResults, \ from allmydata.mutable.common import CorruptShareError from allmydata.storage import storage_index_to_dir from allmydata.util import log, fileutil, pollmixin -from allmydata.stats import PickleStatsGatherer +from allmydata.stats import StatsGathererService from allmydata.key_generator import KeyGeneratorService import common_util as testutil @@ -355,15 +354,19 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): def _set_up_stats_gatherer(self, res): statsdir = self.getdir("stats_gatherer") fileutil.make_dirs(statsdir) - t = Tub() - self.add_service(t) - l = t.listenOn("tcp:0") - p = l.getPortnum() - t.setLocation("localhost:%d" % p) - - self.stats_gatherer = PickleStatsGatherer(t, statsdir, False) - self.add_service(self.stats_gatherer) - self.stats_gatherer_furl = self.stats_gatherer.get_furl() + self.stats_gatherer_svc = StatsGathererService(statsdir) + self.stats_gatherer = self.stats_gatherer_svc.stats_gatherer + self.add_service(self.stats_gatherer_svc) + + d = fireEventually() + sgf = os.path.join(statsdir, 'stats_gatherer.furl') + def check_for_furl(): + return os.path.exists(sgf) + d.addCallback(lambda junk: self.poll(check_for_furl, timeout=30)) + def get_furl(junk): + self.stats_gatherer_furl = file(sgf, 'rb').read().strip() + d.addCallback(get_furl) + return d def _set_up_key_generator(self, res): kgsdir = self.getdir("key_generator") -- 2.45.2