From b29ce1c30a59f69a1f72dc16a282fb6a41594b50 Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@lothar.com>
Date: Fri, 16 Nov 2007 20:07:50 -0700
Subject: [PATCH] logpublisher: implement subscribe/publish for log, add a
 sample client

---
 misc/get-logs.py                | 43 ++++++++++++++++++++++++++
 src/allmydata/logpublisher.py   | 55 ++++++++++++++++++++++++++++++++-
 src/allmydata/test/test_node.py | 27 +++++++++++++++-
 3 files changed, 123 insertions(+), 2 deletions(-)
 create mode 100644 misc/get-logs.py

diff --git a/misc/get-logs.py b/misc/get-logs.py
new file mode 100644
index 00000000..4471d4a0
--- /dev/null
+++ b/misc/get-logs.py
@@ -0,0 +1,43 @@
+#! /usr/bin/python
+
+import sys
+import foolscap
+from foolscap.schema import DictOf, Any
+from twisted.internet import reactor
+from zope.interface import implements
+from twisted.python import log
+#log.startLogging(sys.stderr)
+
+
+class RILogObserver(foolscap.RemoteInterface):
+    def msg(logmsg=DictOf(str, Any())):
+        return None
+
+class LogFetcher(foolscap.Referenceable):
+    implements(RILogObserver)
+    def start(self, target_furl):
+        print "Connecting.."
+        self._tub = foolscap.Tub()
+        self._tub.startService()
+        d = self._tub.getReference(target_furl)
+        d.addCallback(self._got_logpublisher)
+        d.addErrback(self._error)
+
+    def _error(self, f):
+        print "ERROR", f
+        reactor.stop()
+
+    def _got_logpublisher(self, publisher):
+        print "Connected"
+        d = publisher.callRemote("subscribe_to_all", self)
+        d.addErrback(self._error)
+
+    def remote_msg(self, d):
+        print d
+
+
+target_furl = sys.argv[1]
+lf = LogFetcher()
+lf.start(target_furl)
+#print "starting.."
+reactor.run()
diff --git a/src/allmydata/logpublisher.py b/src/allmydata/logpublisher.py
index 4026edba..2c174b3a 100644
--- a/src/allmydata/logpublisher.py
+++ b/src/allmydata/logpublisher.py
@@ -2,23 +2,42 @@
 import os.path
 from zope.interface import implements
 from twisted.application import service
+from twisted.python import log
 from foolscap import Referenceable, RemoteInterface
-from foolscap.schema import DictOf
+from foolscap.schema import DictOf, Any
+from foolscap.eventual import eventually
+
+class RILogObserver(RemoteInterface):
+    def msg(logmsg=DictOf(str, Any())):
+        return None
+class RISubscription(RemoteInterface):
+    pass
 
 class RILogPublisher(RemoteInterface):
     def get_versions():
         return DictOf(str, str)
+    def subscribe_to_all(observer=RILogObserver):
+        return RISubscription
+    def unsubscribe(subscription=Any()):
+        # I don't know how to get the constraint right: unsubscribe() should
+        # accept return value of subscribe_to_all()
+        return None
 
 class RILogGatherer(RemoteInterface):
     def logport(nodeid=str, logport=RILogPublisher):
         return None
 
+class Subscription(Referenceable):
+    implements(RISubscription)
+
 class LogPublisher(Referenceable, service.MultiService):
     implements(RILogPublisher)
     name = "log_publisher"
 
     def __init__(self):
         service.MultiService.__init__(self)
+        self._subscribers = {}
+        self._notifyOnDisconnectors = {}
 
     def startService(self):
         service.MultiService.startService(self)
@@ -26,6 +45,28 @@ class LogPublisher(Referenceable, service.MultiService):
         self.parent.tub.registerReference(self, furlFile=furlfile)
         os.chmod(furlfile, 0600)
 
+        log.addObserver(self._twisted_log_observer)
+
+    def stopService(self):
+        log.removeObserver(self._twisted_log_observer)
+        return service.MultiService.stopService(self)
+
+    def _twisted_log_observer(self, d):
+        # Twisted will remove this for us if it fails.
+
+        # keys:
+        #  ['message']: *args
+        #  ['time']: float
+        #  ['isError']: bool, usually False
+        #  ['system']: string
+
+        for o in self._subscribers.values():
+            o.callRemoteOnly("msg", d)
+
+        #f = open("/tmp/f.out", "a")
+        #print >>f, d['message']
+        #f.close()
+
     def remote_get_versions(self):
         versions = self.parent.get_versions()
         # our __version__ attributes are actually instances of
@@ -34,3 +75,15 @@ class LogPublisher(Referenceable, service.MultiService):
         return dict([(k,str(v))
                      for k,v in versions.items()])
 
+    def remote_subscribe_to_all(self, observer):
+        s = Subscription()
+        self._subscribers[s] = observer
+        c = observer.notifyOnDisconnect(self.remote_unsubscribe, s)
+        self._notifyOnDisconnectors[s] = c
+        return s
+
+    def remote_unsubscribe(self, s):
+        observer = self._subscribers.pop(s)
+        c = self._notifyOnDisconnectors.pop(s)
+        observer.dontNotifyOnDisconnect(c)
+
diff --git a/src/allmydata/test/test_node.py b/src/allmydata/test/test_node.py
index e5597bad..c66a76f3 100644
--- a/src/allmydata/test/test_node.py
+++ b/src/allmydata/test/test_node.py
@@ -6,7 +6,7 @@ from twisted.internet import defer
 from twisted.python import log
 
 from foolscap import Tub, Referenceable
-from foolscap.eventual import flushEventualQueue
+from foolscap.eventual import fireEventually, flushEventualQueue
 from twisted.application import service
 import allmydata
 from allmydata.node import Node, formatTimeTahoeStyle
@@ -61,6 +61,7 @@ class TestCase(unittest.TestCase, testutil.SignalMixin):
     def test_logpublisher(self):
         basedir = "test_node/test_logpublisher"
         fileutil.make_dirs(basedir)
+        observer = LogObserver()
         n = TestNode(basedir)
         n.setServiceParent(self.parent)
         d = n.when_tub_ready()
@@ -75,6 +76,23 @@ class TestCase(unittest.TestCase, testutil.SignalMixin):
                 self.failUnlessEqual(versions["allmydata"],
                                      allmydata.__version__)
             d.addCallback(_check)
+            d.addCallback(lambda res:
+                          logport.callRemote("subscribe_to_all", observer))
+            def _emit(subscription):
+                self._subscription = subscription
+                log.msg("message here")
+            d.addCallback(_emit)
+            d.addCallback(fireEventually)
+            d.addCallback(fireEventually)
+            def _check_observer(res):
+                msgs = observer.messages
+                self.failUnlessEqual(len(msgs), 1)
+                #print msgs
+                self.failUnlessEqual(msgs[0]["message"], ("message here",) )
+            d.addCallback(_check_observer)
+            def _done(res):
+                return logport.callRemote("unsubscribe", self._subscription)
+            d.addCallback(_done)
             return d
         d.addCallback(_got_logport)
         return d
@@ -122,3 +140,10 @@ class Gatherer(Referenceable):
     def remote_logport(self, nodeid, logport):
         d = logport.callRemote("get_versions")
         d.addCallback(self.d.callback)
+
+class LogObserver(Referenceable):
+    implements(logpublisher.RILogObserver)
+    def __init__(self):
+        self.messages = []
+    def remote_msg(self, d):
+        self.messages.append(d)
-- 
2.45.2