#330: convert stats-gatherer into a .tac file service, add 'tahoe create-stats-gatherer'
authorBrian Warner <warner@allmydata.com>
Tue, 18 Nov 2008 08:46:20 +0000 (01:46 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 18 Nov 2008 08:46:20 +0000 (01:46 -0700)
src/allmydata/scripts/runner.py
src/allmydata/scripts/stats_gatherer.py [new file with mode: 0644]
src/allmydata/stats.py
src/allmydata/test/common.py

index fcb5d14797d9e37fd0952ed0e4d92dd0739773d7..235cc2a072b9bb3704963dc5fe359d88cd97efa8 100644 (file)
@@ -8,10 +8,11 @@ from twisted.python import usage
 
 pkg_resources.require('allmydata-tahoe')
 from allmydata.scripts.common import BaseOptions
-import debug, create_node, startstop_node, cli, keygen
+import debug, create_node, startstop_node, cli, keygen, stats_gatherer
 
 _general_commands = ( create_node.subCommands
                     + keygen.subCommands
+                    + stats_gatherer.subCommands
                     + debug.subCommands
                     + cli.subCommands
                     )
@@ -77,6 +78,8 @@ def runner(argv,
         rc = cli.dispatch[command](so)
     elif command in keygen.dispatch:
         rc = keygen.dispatch[command](so, stdout, stderr)
+    elif command in stats_gatherer.dispatch:
+        rc = stats_gatherer.dispatch[command](so)
     elif command in ac_dispatch:
         rc = ac_dispatch[command](so, stdout, stderr)
     else:
diff --git a/src/allmydata/scripts/stats_gatherer.py b/src/allmydata/scripts/stats_gatherer.py
new file mode 100644 (file)
index 0000000..c7f8fdd
--- /dev/null
@@ -0,0 +1,57 @@
+
+import os
+from twisted.python import usage
+
+class CreateStatsGathererOptions(usage.Options):
+    optParameters = [
+        ["basedir", "C", None, "which directory to create the stats-gatherer in"],
+        ]
+
+    def parseArgs(self, basedir=None):
+        if basedir is not None:
+            assert self["basedir"] is None
+            self["basedir"] = basedir
+
+
+stats_gatherer_tac = """
+# -*- python -*-
+
+from allmydata import stats
+from twisted.application import service
+
+verbose = True
+g = stats.StatsGathererService(verbose=verbose)
+
+application = service.Application('allmydata_stats_gatherer')
+g.setServiceParent(application)
+"""
+
+def create_stats_gatherer(config):
+    out = config.stdout
+    err = config.stderr
+    basedir = config['basedir']
+    if not basedir:
+        print >>err, "a basedir was not provided, please use --basedir or -C"
+        return -1
+    if os.path.exists(basedir):
+        if os.listdir(basedir):
+            print >>err, "The base directory \"%s\", which is \"%s\" is not empty." % (basedir, os.path.abspath(basedir))
+            print >>err, "To avoid clobbering anything, I am going to quit now."
+            print >>err, "Please use a different directory, or empty this one."
+            return -1
+        # we're willing to use an empty directory
+    else:
+        os.mkdir(basedir)
+    f = open(os.path.join(basedir, "tahoe-stats-gatherer.tac"), "wb")
+    f.write(stats_gatherer_tac)
+    f.close()
+
+subCommands = [
+    ["create-stats-gatherer", None, CreateStatsGathererOptions, "Create a stats-gatherer service."],
+]
+
+dispatch = {
+    "create-stats-gatherer": create_stats_gatherer,
+    }
+
+
index 0b4af67ff8f0978ea316172f4eb41ce26d2ada91..fc9ca9e7b0f1aa9961f374a3f2aa7591767c0dd9 100644 (file)
@@ -2,17 +2,15 @@
 import os
 import pickle
 import pprint
-import sys
 import time
 from collections import deque
 
-from twisted.internet import reactor, defer
+from twisted.internet import reactor
 from twisted.application import service
 from twisted.application.internet import TimerService
 from zope.interface import implements
 import foolscap
 from foolscap.eventual import eventually
-from foolscap.logging.gatherer import get_local_ip_for
 from twisted.internet.error import ConnectionDone, ConnectionLost
 from foolscap import DeadReferenceError
 
@@ -125,6 +123,7 @@ class CPUUsageMonitor(service.MultiService):
         s["cpu_monitor.total"] = now_cpu - self.initial_cpu
         return s
 
+
 class StatsProvider(foolscap.Referenceable, service.MultiService):
     implements(RIStatsProvider)
 
@@ -180,32 +179,21 @@ class StatsProvider(foolscap.Referenceable, service.MultiService):
     def _connected(self, gatherer, nickname):
         gatherer.callRemoteOnly('provide', self, nickname or '')
 
+
 class StatsGatherer(foolscap.Referenceable, service.MultiService):
     implements(RIStatsGatherer)
 
     poll_interval = 60
 
-    def __init__(self, tub, basedir):
+    def __init__(self, basedir):
         service.MultiService.__init__(self)
-        self.tub = tub
         self.basedir = basedir
 
         self.clients = {}
         self.nicknames = {}
 
-    def startService(self):
-        # the Tub must have a location set on it by now
-        service.MultiService.startService(self)
         self.timer = TimerService(self.poll_interval, self.poll)
         self.timer.setServiceParent(self)
-        self.registerGatherer()
-
-    def get_furl(self):
-        return self.my_furl
-
-    def registerGatherer(self):
-        furl_file = os.path.join(self.basedir, "stats_gatherer.furl")
-        self.my_furl = self.tub.registerReference(self, furlFile=furl_file)
 
     def get_tubid(self, rref):
         return foolscap.SturdyRef(rref.tracker.getURL()).getTubRef().getTubID()
@@ -251,7 +239,7 @@ class StdOutStatsGatherer(StatsGatherer):
         StatsGatherer.remote_provide(self, provider, nickname)
 
     def announce_lost_client(self, tubid):
-        print 'disconnect "%s" [%s]:' % (self.nicknames[tubid], tubid)
+        print 'disconnect "%s" [%s]' % (self.nicknames[tubid], tubid)
 
     def got_stats(self, stats, tubid, nickname):
         print '"%s" [%s]:' % (nickname, tubid)
@@ -260,9 +248,9 @@ class StdOutStatsGatherer(StatsGatherer):
 class PickleStatsGatherer(StdOutStatsGatherer):
     # inherit from StdOutStatsGatherer for connect/disconnect notifications
 
-    def __init__(self, tub, basedir=".", verbose=True):
+    def __init__(self, basedir=".", verbose=True):
         self.verbose = verbose
-        StatsGatherer.__init__(self, tub, basedir)
+        StatsGatherer.__init__(self, basedir)
         self.picklefile = os.path.join(basedir, "stats.pickle")
 
         if os.path.exists(self.picklefile):
@@ -288,51 +276,39 @@ class PickleStatsGatherer(StdOutStatsGatherer):
             os.unlink(self.picklefile)
         os.rename(tmp, self.picklefile)
 
-class GathererApp(object):
-    def __init__(self):
-        d = self.setup_tub()
-        d.addCallback(self._tub_ready)
-
-    def setup_tub(self):
-        self._tub = foolscap.Tub(certFile="stats_gatherer.pem")
-        self._tub.setOption("logLocalFailures", True)
-        self._tub.setOption("logRemoteFailures", True)
-        self._tub.startService()
-        portnumfile = "portnum"
+class StatsGathererService(service.MultiService):
+    furl_file = "stats_gatherer.furl"
+
+    def __init__(self, basedir=".", verbose=False):
+        service.MultiService.__init__(self)
+        self.basedir = basedir
+        self.tub = foolscap.Tub(certFile=os.path.join(self.basedir,
+                                                      "stats_gatherer.pem"))
+        self.tub.setServiceParent(self)
+        self.tub.setOption("logLocalFailures", True)
+        self.tub.setOption("logRemoteFailures", True)
+
+        self.stats_gatherer = PickleStatsGatherer(self.basedir, verbose)
+        self.stats_gatherer.setServiceParent(self)
+
+        portnumfile = os.path.join(self.basedir, "portnum")
         try:
-            portnum = int(open(portnumfile, "r").read())
-        except (EnvironmentError, ValueError):
-            portnum = 0
-        self._tub.listenOn("tcp:%d" % portnum)
-        d = defer.maybeDeferred(get_local_ip_for)
-        d.addCallback(self._set_location)
-        d.addCallback(lambda res: self._tub)
-        return d
-
-    def _set_location(self, local_address):
-        if local_address is None:
-            local_addresses = ["127.0.0.1"]
-        else:
-            local_addresses = [local_address, "127.0.0.1"]
-        l = self._tub.getListeners()[0]
-        portnum = l.getPortnum()
-        portnumfile = "portnum"
-        open(portnumfile, "w").write("%d\n" % portnum)
-        local_addresses = [ "%s:%d" % (addr, portnum,)
-                            for addr in local_addresses ]
-        assert len(local_addresses) >= 1
-        location = ",".join(local_addresses)
-        self._tub.setLocation(location)
-
-    def _tub_ready(self, tub):
-        sg = PickleStatsGatherer(tub, ".")
-        sg.setServiceParent(tub)
-        sg.verbose = True
-        print '\nStatsGatherer: %s\n' % (sg.get_furl(),)
-
-def main(argv):
-    ga = GathererApp()
-    reactor.run()
-
-if __name__ == '__main__':
-    main(sys.argv)
+            portnum = open(portnumfile, "r").read()
+        except EnvironmentError:
+            portnum = None
+        self.listener = self.tub.listenOn(portnum or "tcp:0")
+        d = self.tub.setLocationAutomatically()
+        if portnum is None:
+            d.addCallback(self.save_portnum)
+        d.addCallback(self.tub_ready)
+        d.addErrback(log.err)
+
+    def save_portnum(self, junk):
+        portnum = self.listener.getPortnum()
+        portnumfile = os.path.join(self.basedir, 'portnum')
+        open(portnumfile, 'wb').write('%d\n' % (portnum,))
+
+    def tub_ready(self, ignored):
+        ff = os.path.join(self.basedir, self.furl_file)
+        self.gatherer_furl = self.tub.registerReference(self.stats_gatherer,
+                                                        furlFile=ff)
index e95a59d166650c3571b9cad23e60e7fb4a08f7f6..2585034b69e11ac6c99e8e0407dbb6555351e52c 100644 (file)
@@ -6,7 +6,6 @@ from twisted.internet.interfaces import IConsumer
 from twisted.python import failure
 from twisted.application import service
 from twisted.web.error import Error as WebError
-from foolscap import Tub
 from foolscap.eventual import flushEventualQueue, fireEventually
 from allmydata import uri, dirnode, client
 from allmydata.introducer.server import IntroducerNode
@@ -17,7 +16,7 @@ from allmydata.checker_results import CheckerResults, CheckAndRepairResults, \
 from allmydata.mutable.common import CorruptShareError
 from allmydata.storage import storage_index_to_dir
 from allmydata.util import log, fileutil, pollmixin
-from allmydata.stats import PickleStatsGatherer
+from allmydata.stats import StatsGathererService
 from allmydata.key_generator import KeyGeneratorService
 import common_util as testutil
 
@@ -355,15 +354,19 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
     def _set_up_stats_gatherer(self, res):
         statsdir = self.getdir("stats_gatherer")
         fileutil.make_dirs(statsdir)
-        t = Tub()
-        self.add_service(t)
-        l = t.listenOn("tcp:0")
-        p = l.getPortnum()
-        t.setLocation("localhost:%d" % p)
-
-        self.stats_gatherer = PickleStatsGatherer(t, statsdir, False)
-        self.add_service(self.stats_gatherer)
-        self.stats_gatherer_furl = self.stats_gatherer.get_furl()
+        self.stats_gatherer_svc = StatsGathererService(statsdir)
+        self.stats_gatherer = self.stats_gatherer_svc.stats_gatherer
+        self.add_service(self.stats_gatherer_svc)
+
+        d = fireEventually()
+        sgf = os.path.join(statsdir, 'stats_gatherer.furl')
+        def check_for_furl():
+            return os.path.exists(sgf)
+        d.addCallback(lambda junk: self.poll(check_for_furl, timeout=30))
+        def get_furl(junk):
+            self.stats_gatherer_furl = file(sgf, 'rb').read().strip()
+        d.addCallback(get_furl)
+        return d
 
     def _set_up_key_generator(self, res):
         kgsdir = self.getdir("key_generator")