From d777283e9e30fa44a6cd991b71171cf16e2748a8 Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@allmydata.com>
Date: Thu, 1 Nov 2007 17:29:15 -0700
Subject: [PATCH] implement preliminary log publisher/gatherer

This creates a Referenceable object that will eventually be able to publish
log events to a remote subscriber (at present all it can do is provide
version information). The FURL for this logport is written to 'logport.furl'.

In addition, if a file named 'log_gatherer.furl' is present, the given target
will be contacted and offered access to the logport. This can be used by a
centralized logging agent to subscribe to logs, e.g. from all the nodes in a
centrally-maintained storage grid. (think syslog -r, but with all the
security properties of FURLs, and permitting non-printable strings and
structured data).

Once this framework matures a bit, it will be moved into Foolscap.
---
 docs/configuration.txt          | 10 +++++
 src/allmydata/logpublisher.py   | 36 +++++++++++++++
 src/allmydata/node.py           | 10 ++++-
 src/allmydata/test/test_node.py | 77 ++++++++++++++++++++++++++++++---
 4 files changed, 126 insertions(+), 7 deletions(-)
 create mode 100644 src/allmydata/logpublisher.py

diff --git a/docs/configuration.txt b/docs/configuration.txt
index a3b932e7..62e4f5a3 100644
--- a/docs/configuration.txt
+++ b/docs/configuration.txt
@@ -91,6 +91,16 @@ it (on operating systems that support such a concept), to insure that only
 the owner of the client node can use this feature.  This port is intended for
 debugging and testing use.
 
+logport.furl : this file contains a FURL that provides access to a 'log port'
+on the client node, from which operational logs can be retrieved. Do not
+grant logport access to strangers, because occasionally secret information
+may be placed in the logs.
+
+log_gatherer.furl : if present, this file is used to contact a 'log
+gatherer', which will be granted access to the logport. This can be used by
+centralized storage meshes to gather operational logs in a single place.
+
+
 == Introducer/vdrive-server configuration ==
 
 Introducer/vdrive-server nodes use the same 'advertised_ip_addresses' file
diff --git a/src/allmydata/logpublisher.py b/src/allmydata/logpublisher.py
new file mode 100644
index 00000000..4026edba
--- /dev/null
+++ b/src/allmydata/logpublisher.py
@@ -0,0 +1,36 @@
+
+import os.path
+from zope.interface import implements
+from twisted.application import service
+from foolscap import Referenceable, RemoteInterface
+from foolscap.schema import DictOf
+
+class RILogPublisher(RemoteInterface):
+    def get_versions():
+        return DictOf(str, str)
+
+class RILogGatherer(RemoteInterface):
+    def logport(nodeid=str, logport=RILogPublisher):
+        return None
+
+class LogPublisher(Referenceable, service.MultiService):
+    implements(RILogPublisher)
+    name = "log_publisher"
+
+    def __init__(self):
+        service.MultiService.__init__(self)
+
+    def startService(self):
+        service.MultiService.startService(self)
+        furlfile = os.path.join(self.parent.basedir, "logport.furl")
+        self.parent.tub.registerReference(self, furlFile=furlfile)
+        os.chmod(furlfile, 0600)
+
+    def remote_get_versions(self):
+        versions = self.parent.get_versions()
+        # our __version__ attributes are actually instances of
+        # allmydata.util.version_class.Version, so convert them into strings
+        # first.
+        return dict([(k,str(v))
+                     for k,v in versions.items()])
+
diff --git a/src/allmydata/node.py b/src/allmydata/node.py
index 7310595c..2fa82bcf 100644
--- a/src/allmydata/node.py
+++ b/src/allmydata/node.py
@@ -9,6 +9,7 @@ from twisted.internet import defer, reactor
 from foolscap import Tub, eventual
 from allmydata.util import iputil, observer, humanreadable
 from allmydata.util.assertutil import precondition
+from allmydata.logpublisher import LogPublisher
 
 # Just to get their versions:
 import allmydata
@@ -234,7 +235,14 @@ class Node(service.MultiService):
 
     def tub_ready(self):
         # called when the Tub is available for registerReference
-        pass
+        self.add_service(LogPublisher())
+        log_gatherer_furl = self.get_config("log_gatherer.furl")
+        if log_gatherer_furl:
+            self.tub.connectTo(log_gatherer_furl, self._log_gatherer_connected)
+
+    def _log_gatherer_connected(self, rref):
+        rref.callRemote("logport",
+                        self.nodeid, self.getServiceNamed("log_publisher"))
 
     def when_tub_ready(self):
         return self._tub_ready_observerlist.when_fired()
diff --git a/src/allmydata/test/test_node.py b/src/allmydata/test/test_node.py
index ee860c00..e5597bad 100644
--- a/src/allmydata/test/test_node.py
+++ b/src/allmydata/test/test_node.py
@@ -1,13 +1,17 @@
 
-import time
+import os, time
+from zope.interface import implements
 from twisted.trial import unittest
 from twisted.internet import defer
 from twisted.python import log
 
+from foolscap import Tub, Referenceable
 from foolscap.eventual import flushEventualQueue
 from twisted.application import service
+import allmydata
 from allmydata.node import Node, formatTimeTahoeStyle
-from allmydata.util import testutil
+from allmydata.util import testutil, fileutil
+from allmydata import logpublisher
 
 class LoggingMultiService(service.MultiService):
     def log(self, msg):
@@ -29,24 +33,80 @@ class TestCase(unittest.TestCase, testutil.SignalMixin):
         return d
 
     def test_advertised_ip_addresses(self):
-        open('advertised_ip_addresses','w').write('1.2.3.4:5')
+        basedir = "test_node/test_advertised_ip_addresses"
+        fileutil.make_dirs(basedir)
+        f = open(os.path.join(basedir, 'advertised_ip_addresses'),'w')
+        f.write('1.2.3.4:5')
+        f.close()
 
-        n = TestNode()
+        n = TestNode(basedir)
         n.setServiceParent(self.parent)
         d = n.when_tub_ready()
 
         def _check_addresses(ignored_result):
-            self.failUnless("1.2.3.4:5" in n.tub.registerReference(n), n.tub.registerReference(n))
+            furl = n.tub.registerReference(n)
+            self.failUnless("1.2.3.4:5" in furl, furl)
 
         d.addCallback(_check_addresses)
         return d
 
     def test_log(self):
-        n = TestNode()
+        basedir = "test_node/test_log"
+        fileutil.make_dirs(basedir)
+        n = TestNode(basedir)
         n.log("this is a message")
         n.log("with %d %s %s", args=(2, "interpolated", "parameters"))
         n.log("with bogus %d expansion", args=("not an integer",))
 
+    def test_logpublisher(self):
+        basedir = "test_node/test_logpublisher"
+        fileutil.make_dirs(basedir)
+        n = TestNode(basedir)
+        n.setServiceParent(self.parent)
+        d = n.when_tub_ready()
+        def _ready(res):
+            n.log("starting up")
+            flogport = open(os.path.join(n.basedir,"logport.furl"), "r").read()
+            return n.tub.getReference(flogport.strip())
+        d.addCallback(_ready)
+        def _got_logport(logport):
+            d = logport.callRemote("get_versions")
+            def _check(versions):
+                self.failUnlessEqual(versions["allmydata"],
+                                     allmydata.__version__)
+            d.addCallback(_check)
+            return d
+        d.addCallback(_got_logport)
+        return d
+
+    def test_log_gatherer(self):
+        t = Tub()
+        t.setServiceParent(self.parent)
+        t.listenOn("tcp:0:interface=127.0.0.1")
+        l = t.getListeners()[0]
+        portnum = l.getPortnum()
+        t.setLocation("127.0.0.1:%d" % portnum)
+        gatherer = Gatherer()
+        gatherer.d = defer.Deferred()
+        gatherer_furl = t.registerReference(gatherer)
+
+        basedir = "test_node/test_log_gatherer"
+        fileutil.make_dirs(basedir)
+        f = open(os.path.join(basedir, "log_gatherer.furl"), "w")
+        f.write(gatherer_furl + "\n")
+        f.close()
+
+        n = TestNode(basedir)
+        n.setServiceParent(self.parent)
+        d = n.when_tub_ready()
+        def _ready(res):
+            n.log("starting up")
+            # about now, the node will be contacting the Gatherer and
+            # offering its logport.
+            return gatherer.d
+        d.addCallback(_ready)
+        return d
+
     def test_timestamp(self):
         # this modified logger doesn't seem to get used during the tests,
         # probably because we don't modify the LogObserver that trial
@@ -57,3 +117,8 @@ class TestCase(unittest.TestCase, testutil.SignalMixin):
         t2 = formatTimeTahoeStyle("ignored", int(time.time()))
         self.failUnless("Z" in t2)
 
+class Gatherer(Referenceable):
+    implements(logpublisher.RILogGatherer)
+    def remote_logport(self, nodeid, logport):
+        d = logport.callRemote("get_versions")
+        d.addCallback(self.d.callback)
-- 
2.45.2