From d777283e9e30fa44a6cd991b71171cf16e2748a8 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Thu, 1 Nov 2007 17:29:15 -0700 Subject: [PATCH] implement preliminary log publisher/gatherer This creates a Referenceable object that will eventually be able to publish log events to a remote subscriber (at present all it can do is provide version information). The FURL for this logport is written to 'logport.furl'. In addition, if a file named 'log_gatherer.furl' is present, the given target will be contacted and offered access to the logport. This can be used by a centralized logging agent to subscribe to logs, e.g. from all the nodes in a centrally-maintained storage grid. (think syslog -r, but with all the security properties of FURLs, and permitting non-printable strings and structured data). Once this framework matures a bit, it will be moved into Foolscap. --- docs/configuration.txt | 10 +++++ src/allmydata/logpublisher.py | 36 +++++++++++++++ src/allmydata/node.py | 10 ++++- src/allmydata/test/test_node.py | 77 ++++++++++++++++++++++++++++++--- 4 files changed, 126 insertions(+), 7 deletions(-) create mode 100644 src/allmydata/logpublisher.py diff --git a/docs/configuration.txt b/docs/configuration.txt index a3b932e7..62e4f5a3 100644 --- a/docs/configuration.txt +++ b/docs/configuration.txt @@ -91,6 +91,16 @@ it (on operating systems that support such a concept), to insure that only the owner of the client node can use this feature. This port is intended for debugging and testing use. +logport.furl : this file contains a FURL that provides access to a 'log port' +on the client node, from which operational logs can be retrieved. Do not +grant logport access to strangers, because occasionally secret information +may be placed in the logs. + +log_gatherer.furl : if present, this file is used to contact a 'log +gatherer', which will be granted access to the logport. This can be used by +centralized storage meshes to gather operational logs in a single place. + + == Introducer/vdrive-server configuration == Introducer/vdrive-server nodes use the same 'advertised_ip_addresses' file diff --git a/src/allmydata/logpublisher.py b/src/allmydata/logpublisher.py new file mode 100644 index 00000000..4026edba --- /dev/null +++ b/src/allmydata/logpublisher.py @@ -0,0 +1,36 @@ + +import os.path +from zope.interface import implements +from twisted.application import service +from foolscap import Referenceable, RemoteInterface +from foolscap.schema import DictOf + +class RILogPublisher(RemoteInterface): + def get_versions(): + return DictOf(str, str) + +class RILogGatherer(RemoteInterface): + def logport(nodeid=str, logport=RILogPublisher): + return None + +class LogPublisher(Referenceable, service.MultiService): + implements(RILogPublisher) + name = "log_publisher" + + def __init__(self): + service.MultiService.__init__(self) + + def startService(self): + service.MultiService.startService(self) + furlfile = os.path.join(self.parent.basedir, "logport.furl") + self.parent.tub.registerReference(self, furlFile=furlfile) + os.chmod(furlfile, 0600) + + def remote_get_versions(self): + versions = self.parent.get_versions() + # our __version__ attributes are actually instances of + # allmydata.util.version_class.Version, so convert them into strings + # first. + return dict([(k,str(v)) + for k,v in versions.items()]) + diff --git a/src/allmydata/node.py b/src/allmydata/node.py index 7310595c..2fa82bcf 100644 --- a/src/allmydata/node.py +++ b/src/allmydata/node.py @@ -9,6 +9,7 @@ from twisted.internet import defer, reactor from foolscap import Tub, eventual from allmydata.util import iputil, observer, humanreadable from allmydata.util.assertutil import precondition +from allmydata.logpublisher import LogPublisher # Just to get their versions: import allmydata @@ -234,7 +235,14 @@ class Node(service.MultiService): def tub_ready(self): # called when the Tub is available for registerReference - pass + self.add_service(LogPublisher()) + log_gatherer_furl = self.get_config("log_gatherer.furl") + if log_gatherer_furl: + self.tub.connectTo(log_gatherer_furl, self._log_gatherer_connected) + + def _log_gatherer_connected(self, rref): + rref.callRemote("logport", + self.nodeid, self.getServiceNamed("log_publisher")) def when_tub_ready(self): return self._tub_ready_observerlist.when_fired() diff --git a/src/allmydata/test/test_node.py b/src/allmydata/test/test_node.py index ee860c00..e5597bad 100644 --- a/src/allmydata/test/test_node.py +++ b/src/allmydata/test/test_node.py @@ -1,13 +1,17 @@ -import time +import os, time +from zope.interface import implements from twisted.trial import unittest from twisted.internet import defer from twisted.python import log +from foolscap import Tub, Referenceable from foolscap.eventual import flushEventualQueue from twisted.application import service +import allmydata from allmydata.node import Node, formatTimeTahoeStyle -from allmydata.util import testutil +from allmydata.util import testutil, fileutil +from allmydata import logpublisher class LoggingMultiService(service.MultiService): def log(self, msg): @@ -29,24 +33,80 @@ class TestCase(unittest.TestCase, testutil.SignalMixin): return d def test_advertised_ip_addresses(self): - open('advertised_ip_addresses','w').write('1.2.3.4:5') + basedir = "test_node/test_advertised_ip_addresses" + fileutil.make_dirs(basedir) + f = open(os.path.join(basedir, 'advertised_ip_addresses'),'w') + f.write('1.2.3.4:5') + f.close() - n = TestNode() + n = TestNode(basedir) n.setServiceParent(self.parent) d = n.when_tub_ready() def _check_addresses(ignored_result): - self.failUnless("1.2.3.4:5" in n.tub.registerReference(n), n.tub.registerReference(n)) + furl = n.tub.registerReference(n) + self.failUnless("1.2.3.4:5" in furl, furl) d.addCallback(_check_addresses) return d def test_log(self): - n = TestNode() + basedir = "test_node/test_log" + fileutil.make_dirs(basedir) + n = TestNode(basedir) n.log("this is a message") n.log("with %d %s %s", args=(2, "interpolated", "parameters")) n.log("with bogus %d expansion", args=("not an integer",)) + def test_logpublisher(self): + basedir = "test_node/test_logpublisher" + fileutil.make_dirs(basedir) + n = TestNode(basedir) + n.setServiceParent(self.parent) + d = n.when_tub_ready() + def _ready(res): + n.log("starting up") + flogport = open(os.path.join(n.basedir,"logport.furl"), "r").read() + return n.tub.getReference(flogport.strip()) + d.addCallback(_ready) + def _got_logport(logport): + d = logport.callRemote("get_versions") + def _check(versions): + self.failUnlessEqual(versions["allmydata"], + allmydata.__version__) + d.addCallback(_check) + return d + d.addCallback(_got_logport) + return d + + def test_log_gatherer(self): + t = Tub() + t.setServiceParent(self.parent) + t.listenOn("tcp:0:interface=127.0.0.1") + l = t.getListeners()[0] + portnum = l.getPortnum() + t.setLocation("127.0.0.1:%d" % portnum) + gatherer = Gatherer() + gatherer.d = defer.Deferred() + gatherer_furl = t.registerReference(gatherer) + + basedir = "test_node/test_log_gatherer" + fileutil.make_dirs(basedir) + f = open(os.path.join(basedir, "log_gatherer.furl"), "w") + f.write(gatherer_furl + "\n") + f.close() + + n = TestNode(basedir) + n.setServiceParent(self.parent) + d = n.when_tub_ready() + def _ready(res): + n.log("starting up") + # about now, the node will be contacting the Gatherer and + # offering its logport. + return gatherer.d + d.addCallback(_ready) + return d + def test_timestamp(self): # this modified logger doesn't seem to get used during the tests, # probably because we don't modify the LogObserver that trial @@ -57,3 +117,8 @@ class TestCase(unittest.TestCase, testutil.SignalMixin): t2 = formatTimeTahoeStyle("ignored", int(time.time())) self.failUnless("Z" in t2) +class Gatherer(Referenceable): + implements(logpublisher.RILogGatherer) + def remote_logport(self, nodeid, logport): + d = logport.callRemote("get_versions") + d.addCallback(self.d.callback) -- 2.45.2