From: Brian Warner Date: Sat, 17 Nov 2007 03:07:50 +0000 (-0700) Subject: logpublisher: implement subscribe/publish for log, add a sample client X-Git-Url: https://git.rkrishnan.org/pf/content/en/using.html?a=commitdiff_plain;h=b29ce1c30a59f69a1f72dc16a282fb6a41594b50;p=tahoe-lafs%2Ftahoe-lafs.git logpublisher: implement subscribe/publish for log, add a sample client --- diff --git a/misc/get-logs.py b/misc/get-logs.py new file mode 100644 index 00000000..4471d4a0 --- /dev/null +++ b/misc/get-logs.py @@ -0,0 +1,43 @@ +#! /usr/bin/python + +import sys +import foolscap +from foolscap.schema import DictOf, Any +from twisted.internet import reactor +from zope.interface import implements +from twisted.python import log +#log.startLogging(sys.stderr) + + +class RILogObserver(foolscap.RemoteInterface): + def msg(logmsg=DictOf(str, Any())): + return None + +class LogFetcher(foolscap.Referenceable): + implements(RILogObserver) + def start(self, target_furl): + print "Connecting.." + self._tub = foolscap.Tub() + self._tub.startService() + d = self._tub.getReference(target_furl) + d.addCallback(self._got_logpublisher) + d.addErrback(self._error) + + def _error(self, f): + print "ERROR", f + reactor.stop() + + def _got_logpublisher(self, publisher): + print "Connected" + d = publisher.callRemote("subscribe_to_all", self) + d.addErrback(self._error) + + def remote_msg(self, d): + print d + + +target_furl = sys.argv[1] +lf = LogFetcher() +lf.start(target_furl) +#print "starting.." +reactor.run() diff --git a/src/allmydata/logpublisher.py b/src/allmydata/logpublisher.py index 4026edba..2c174b3a 100644 --- a/src/allmydata/logpublisher.py +++ b/src/allmydata/logpublisher.py @@ -2,23 +2,42 @@ import os.path from zope.interface import implements from twisted.application import service +from twisted.python import log from foolscap import Referenceable, RemoteInterface -from foolscap.schema import DictOf +from foolscap.schema import DictOf, Any +from foolscap.eventual import eventually + +class RILogObserver(RemoteInterface): + def msg(logmsg=DictOf(str, Any())): + return None +class RISubscription(RemoteInterface): + pass class RILogPublisher(RemoteInterface): def get_versions(): return DictOf(str, str) + def subscribe_to_all(observer=RILogObserver): + return RISubscription + def unsubscribe(subscription=Any()): + # I don't know how to get the constraint right: unsubscribe() should + # accept return value of subscribe_to_all() + return None class RILogGatherer(RemoteInterface): def logport(nodeid=str, logport=RILogPublisher): return None +class Subscription(Referenceable): + implements(RISubscription) + class LogPublisher(Referenceable, service.MultiService): implements(RILogPublisher) name = "log_publisher" def __init__(self): service.MultiService.__init__(self) + self._subscribers = {} + self._notifyOnDisconnectors = {} def startService(self): service.MultiService.startService(self) @@ -26,6 +45,28 @@ class LogPublisher(Referenceable, service.MultiService): self.parent.tub.registerReference(self, furlFile=furlfile) os.chmod(furlfile, 0600) + log.addObserver(self._twisted_log_observer) + + def stopService(self): + log.removeObserver(self._twisted_log_observer) + return service.MultiService.stopService(self) + + def _twisted_log_observer(self, d): + # Twisted will remove this for us if it fails. + + # keys: + # ['message']: *args + # ['time']: float + # ['isError']: bool, usually False + # ['system']: string + + for o in self._subscribers.values(): + o.callRemoteOnly("msg", d) + + #f = open("/tmp/f.out", "a") + #print >>f, d['message'] + #f.close() + def remote_get_versions(self): versions = self.parent.get_versions() # our __version__ attributes are actually instances of @@ -34,3 +75,15 @@ class LogPublisher(Referenceable, service.MultiService): return dict([(k,str(v)) for k,v in versions.items()]) + def remote_subscribe_to_all(self, observer): + s = Subscription() + self._subscribers[s] = observer + c = observer.notifyOnDisconnect(self.remote_unsubscribe, s) + self._notifyOnDisconnectors[s] = c + return s + + def remote_unsubscribe(self, s): + observer = self._subscribers.pop(s) + c = self._notifyOnDisconnectors.pop(s) + observer.dontNotifyOnDisconnect(c) + diff --git a/src/allmydata/test/test_node.py b/src/allmydata/test/test_node.py index e5597bad..c66a76f3 100644 --- a/src/allmydata/test/test_node.py +++ b/src/allmydata/test/test_node.py @@ -6,7 +6,7 @@ from twisted.internet import defer from twisted.python import log from foolscap import Tub, Referenceable -from foolscap.eventual import flushEventualQueue +from foolscap.eventual import fireEventually, flushEventualQueue from twisted.application import service import allmydata from allmydata.node import Node, formatTimeTahoeStyle @@ -61,6 +61,7 @@ class TestCase(unittest.TestCase, testutil.SignalMixin): def test_logpublisher(self): basedir = "test_node/test_logpublisher" fileutil.make_dirs(basedir) + observer = LogObserver() n = TestNode(basedir) n.setServiceParent(self.parent) d = n.when_tub_ready() @@ -75,6 +76,23 @@ class TestCase(unittest.TestCase, testutil.SignalMixin): self.failUnlessEqual(versions["allmydata"], allmydata.__version__) d.addCallback(_check) + d.addCallback(lambda res: + logport.callRemote("subscribe_to_all", observer)) + def _emit(subscription): + self._subscription = subscription + log.msg("message here") + d.addCallback(_emit) + d.addCallback(fireEventually) + d.addCallback(fireEventually) + def _check_observer(res): + msgs = observer.messages + self.failUnlessEqual(len(msgs), 1) + #print msgs + self.failUnlessEqual(msgs[0]["message"], ("message here",) ) + d.addCallback(_check_observer) + def _done(res): + return logport.callRemote("unsubscribe", self._subscription) + d.addCallback(_done) return d d.addCallback(_got_logport) return d @@ -122,3 +140,10 @@ class Gatherer(Referenceable): def remote_logport(self, nodeid, logport): d = logport.callRemote("get_versions") d.addCallback(self.d.callback) + +class LogObserver(Referenceable): + implements(logpublisher.RILogObserver) + def __init__(self): + self.messages = [] + def remote_msg(self, d): + self.messages.append(d)