--- /dev/null
+
+import os
+from twisted.python import usage
+
+class CreateStatsGathererOptions(usage.Options):
+ optParameters = [
+ ["basedir", "C", None, "which directory to create the stats-gatherer in"],
+ ]
+
+ def parseArgs(self, basedir=None):
+ if basedir is not None:
+ assert self["basedir"] is None
+ self["basedir"] = basedir
+
+
+stats_gatherer_tac = """
+# -*- python -*-
+
+from allmydata import stats
+from twisted.application import service
+
+verbose = True
+g = stats.StatsGathererService(verbose=verbose)
+
+application = service.Application('allmydata_stats_gatherer')
+g.setServiceParent(application)
+"""
+
+def create_stats_gatherer(config):
+ out = config.stdout
+ err = config.stderr
+ basedir = config['basedir']
+ if not basedir:
+ print >>err, "a basedir was not provided, please use --basedir or -C"
+ return -1
+ if os.path.exists(basedir):
+ if os.listdir(basedir):
+ print >>err, "The base directory \"%s\", which is \"%s\" is not empty." % (basedir, os.path.abspath(basedir))
+ print >>err, "To avoid clobbering anything, I am going to quit now."
+ print >>err, "Please use a different directory, or empty this one."
+ return -1
+ # we're willing to use an empty directory
+ else:
+ os.mkdir(basedir)
+ f = open(os.path.join(basedir, "tahoe-stats-gatherer.tac"), "wb")
+ f.write(stats_gatherer_tac)
+ f.close()
+
+subCommands = [
+ ["create-stats-gatherer", None, CreateStatsGathererOptions, "Create a stats-gatherer service."],
+]
+
+dispatch = {
+ "create-stats-gatherer": create_stats_gatherer,
+ }
+
+
import os
import pickle
import pprint
-import sys
import time
from collections import deque
-from twisted.internet import reactor, defer
+from twisted.internet import reactor
from twisted.application import service
from twisted.application.internet import TimerService
from zope.interface import implements
import foolscap
from foolscap.eventual import eventually
-from foolscap.logging.gatherer import get_local_ip_for
from twisted.internet.error import ConnectionDone, ConnectionLost
from foolscap import DeadReferenceError
s["cpu_monitor.total"] = now_cpu - self.initial_cpu
return s
+
class StatsProvider(foolscap.Referenceable, service.MultiService):
implements(RIStatsProvider)
def _connected(self, gatherer, nickname):
gatherer.callRemoteOnly('provide', self, nickname or '')
+
class StatsGatherer(foolscap.Referenceable, service.MultiService):
implements(RIStatsGatherer)
poll_interval = 60
- def __init__(self, tub, basedir):
+ def __init__(self, basedir):
service.MultiService.__init__(self)
- self.tub = tub
self.basedir = basedir
self.clients = {}
self.nicknames = {}
- def startService(self):
- # the Tub must have a location set on it by now
- service.MultiService.startService(self)
self.timer = TimerService(self.poll_interval, self.poll)
self.timer.setServiceParent(self)
- self.registerGatherer()
-
- def get_furl(self):
- return self.my_furl
-
- def registerGatherer(self):
- furl_file = os.path.join(self.basedir, "stats_gatherer.furl")
- self.my_furl = self.tub.registerReference(self, furlFile=furl_file)
def get_tubid(self, rref):
return foolscap.SturdyRef(rref.tracker.getURL()).getTubRef().getTubID()
StatsGatherer.remote_provide(self, provider, nickname)
def announce_lost_client(self, tubid):
- print 'disconnect "%s" [%s]:' % (self.nicknames[tubid], tubid)
+ print 'disconnect "%s" [%s]' % (self.nicknames[tubid], tubid)
def got_stats(self, stats, tubid, nickname):
print '"%s" [%s]:' % (nickname, tubid)
class PickleStatsGatherer(StdOutStatsGatherer):
# inherit from StdOutStatsGatherer for connect/disconnect notifications
- def __init__(self, tub, basedir=".", verbose=True):
+ def __init__(self, basedir=".", verbose=True):
self.verbose = verbose
- StatsGatherer.__init__(self, tub, basedir)
+ StatsGatherer.__init__(self, basedir)
self.picklefile = os.path.join(basedir, "stats.pickle")
if os.path.exists(self.picklefile):
os.unlink(self.picklefile)
os.rename(tmp, self.picklefile)
-class GathererApp(object):
- def __init__(self):
- d = self.setup_tub()
- d.addCallback(self._tub_ready)
-
- def setup_tub(self):
- self._tub = foolscap.Tub(certFile="stats_gatherer.pem")
- self._tub.setOption("logLocalFailures", True)
- self._tub.setOption("logRemoteFailures", True)
- self._tub.startService()
- portnumfile = "portnum"
+class StatsGathererService(service.MultiService):
+ furl_file = "stats_gatherer.furl"
+
+ def __init__(self, basedir=".", verbose=False):
+ service.MultiService.__init__(self)
+ self.basedir = basedir
+ self.tub = foolscap.Tub(certFile=os.path.join(self.basedir,
+ "stats_gatherer.pem"))
+ self.tub.setServiceParent(self)
+ self.tub.setOption("logLocalFailures", True)
+ self.tub.setOption("logRemoteFailures", True)
+
+ self.stats_gatherer = PickleStatsGatherer(self.basedir, verbose)
+ self.stats_gatherer.setServiceParent(self)
+
+ portnumfile = os.path.join(self.basedir, "portnum")
try:
- portnum = int(open(portnumfile, "r").read())
- except (EnvironmentError, ValueError):
- portnum = 0
- self._tub.listenOn("tcp:%d" % portnum)
- d = defer.maybeDeferred(get_local_ip_for)
- d.addCallback(self._set_location)
- d.addCallback(lambda res: self._tub)
- return d
-
- def _set_location(self, local_address):
- if local_address is None:
- local_addresses = ["127.0.0.1"]
- else:
- local_addresses = [local_address, "127.0.0.1"]
- l = self._tub.getListeners()[0]
- portnum = l.getPortnum()
- portnumfile = "portnum"
- open(portnumfile, "w").write("%d\n" % portnum)
- local_addresses = [ "%s:%d" % (addr, portnum,)
- for addr in local_addresses ]
- assert len(local_addresses) >= 1
- location = ",".join(local_addresses)
- self._tub.setLocation(location)
-
- def _tub_ready(self, tub):
- sg = PickleStatsGatherer(tub, ".")
- sg.setServiceParent(tub)
- sg.verbose = True
- print '\nStatsGatherer: %s\n' % (sg.get_furl(),)
-
-def main(argv):
- ga = GathererApp()
- reactor.run()
-
-if __name__ == '__main__':
- main(sys.argv)
+ portnum = open(portnumfile, "r").read()
+ except EnvironmentError:
+ portnum = None
+ self.listener = self.tub.listenOn(portnum or "tcp:0")
+ d = self.tub.setLocationAutomatically()
+ if portnum is None:
+ d.addCallback(self.save_portnum)
+ d.addCallback(self.tub_ready)
+ d.addErrback(log.err)
+
+ def save_portnum(self, junk):
+ portnum = self.listener.getPortnum()
+ portnumfile = os.path.join(self.basedir, 'portnum')
+ open(portnumfile, 'wb').write('%d\n' % (portnum,))
+
+ def tub_ready(self, ignored):
+ ff = os.path.join(self.basedir, self.furl_file)
+ self.gatherer_furl = self.tub.registerReference(self.stats_gatherer,
+ furlFile=ff)
from twisted.python import failure
from twisted.application import service
from twisted.web.error import Error as WebError
-from foolscap import Tub
from foolscap.eventual import flushEventualQueue, fireEventually
from allmydata import uri, dirnode, client
from allmydata.introducer.server import IntroducerNode
from allmydata.mutable.common import CorruptShareError
from allmydata.storage import storage_index_to_dir
from allmydata.util import log, fileutil, pollmixin
-from allmydata.stats import PickleStatsGatherer
+from allmydata.stats import StatsGathererService
from allmydata.key_generator import KeyGeneratorService
import common_util as testutil
def _set_up_stats_gatherer(self, res):
statsdir = self.getdir("stats_gatherer")
fileutil.make_dirs(statsdir)
- t = Tub()
- self.add_service(t)
- l = t.listenOn("tcp:0")
- p = l.getPortnum()
- t.setLocation("localhost:%d" % p)
-
- self.stats_gatherer = PickleStatsGatherer(t, statsdir, False)
- self.add_service(self.stats_gatherer)
- self.stats_gatherer_furl = self.stats_gatherer.get_furl()
+ self.stats_gatherer_svc = StatsGathererService(statsdir)
+ self.stats_gatherer = self.stats_gatherer_svc.stats_gatherer
+ self.add_service(self.stats_gatherer_svc)
+
+ d = fireEventually()
+ sgf = os.path.join(statsdir, 'stats_gatherer.furl')
+ def check_for_furl():
+ return os.path.exists(sgf)
+ d.addCallback(lambda junk: self.poll(check_for_furl, timeout=30))
+ def get_furl(junk):
+ self.stats_gatherer_furl = file(sgf, 'rb').read().strip()
+ d.addCallback(get_furl)
+ return d
def _set_up_key_generator(self, res):
kgsdir = self.getdir("key_generator")