--- /dev/null
+#! /usr/bin/python
+
+import sys
+import foolscap
+from foolscap.schema import DictOf, Any
+from twisted.internet import reactor
+from zope.interface import implements
+from twisted.python import log
+#log.startLogging(sys.stderr)
+
+
+class RILogObserver(foolscap.RemoteInterface):
+ def msg(logmsg=DictOf(str, Any())):
+ return None
+
+class LogFetcher(foolscap.Referenceable):
+ implements(RILogObserver)
+ def start(self, target_furl):
+ print "Connecting.."
+ self._tub = foolscap.Tub()
+ self._tub.startService()
+ d = self._tub.getReference(target_furl)
+ d.addCallback(self._got_logpublisher)
+ d.addErrback(self._error)
+
+ def _error(self, f):
+ print "ERROR", f
+ reactor.stop()
+
+ def _got_logpublisher(self, publisher):
+ print "Connected"
+ d = publisher.callRemote("subscribe_to_all", self)
+ d.addErrback(self._error)
+
+ def remote_msg(self, d):
+ print d
+
+
+target_furl = sys.argv[1]
+lf = LogFetcher()
+lf.start(target_furl)
+#print "starting.."
+reactor.run()
import os.path
from zope.interface import implements
from twisted.application import service
+from twisted.python import log
from foolscap import Referenceable, RemoteInterface
-from foolscap.schema import DictOf
+from foolscap.schema import DictOf, Any
+from foolscap.eventual import eventually
+
+class RILogObserver(RemoteInterface):
+ def msg(logmsg=DictOf(str, Any())):
+ return None
+class RISubscription(RemoteInterface):
+ pass
class RILogPublisher(RemoteInterface):
def get_versions():
return DictOf(str, str)
+ def subscribe_to_all(observer=RILogObserver):
+ return RISubscription
+ def unsubscribe(subscription=Any()):
+ # I don't know how to get the constraint right: unsubscribe() should
+ # accept return value of subscribe_to_all()
+ return None
class RILogGatherer(RemoteInterface):
def logport(nodeid=str, logport=RILogPublisher):
return None
+class Subscription(Referenceable):
+ implements(RISubscription)
+
class LogPublisher(Referenceable, service.MultiService):
implements(RILogPublisher)
name = "log_publisher"
def __init__(self):
service.MultiService.__init__(self)
+ self._subscribers = {}
+ self._notifyOnDisconnectors = {}
def startService(self):
service.MultiService.startService(self)
self.parent.tub.registerReference(self, furlFile=furlfile)
os.chmod(furlfile, 0600)
+ log.addObserver(self._twisted_log_observer)
+
+ def stopService(self):
+ log.removeObserver(self._twisted_log_observer)
+ return service.MultiService.stopService(self)
+
+ def _twisted_log_observer(self, d):
+ # Twisted will remove this for us if it fails.
+
+ # keys:
+ # ['message']: *args
+ # ['time']: float
+ # ['isError']: bool, usually False
+ # ['system']: string
+
+ for o in self._subscribers.values():
+ o.callRemoteOnly("msg", d)
+
+ #f = open("/tmp/f.out", "a")
+ #print >>f, d['message']
+ #f.close()
+
def remote_get_versions(self):
versions = self.parent.get_versions()
# our __version__ attributes are actually instances of
return dict([(k,str(v))
for k,v in versions.items()])
+ def remote_subscribe_to_all(self, observer):
+ s = Subscription()
+ self._subscribers[s] = observer
+ c = observer.notifyOnDisconnect(self.remote_unsubscribe, s)
+ self._notifyOnDisconnectors[s] = c
+ return s
+
+ def remote_unsubscribe(self, s):
+ observer = self._subscribers.pop(s)
+ c = self._notifyOnDisconnectors.pop(s)
+ observer.dontNotifyOnDisconnect(c)
+
from twisted.python import log
from foolscap import Tub, Referenceable
-from foolscap.eventual import flushEventualQueue
+from foolscap.eventual import fireEventually, flushEventualQueue
from twisted.application import service
import allmydata
from allmydata.node import Node, formatTimeTahoeStyle
def test_logpublisher(self):
basedir = "test_node/test_logpublisher"
fileutil.make_dirs(basedir)
+ observer = LogObserver()
n = TestNode(basedir)
n.setServiceParent(self.parent)
d = n.when_tub_ready()
self.failUnlessEqual(versions["allmydata"],
allmydata.__version__)
d.addCallback(_check)
+ d.addCallback(lambda res:
+ logport.callRemote("subscribe_to_all", observer))
+ def _emit(subscription):
+ self._subscription = subscription
+ log.msg("message here")
+ d.addCallback(_emit)
+ d.addCallback(fireEventually)
+ d.addCallback(fireEventually)
+ def _check_observer(res):
+ msgs = observer.messages
+ self.failUnlessEqual(len(msgs), 1)
+ #print msgs
+ self.failUnlessEqual(msgs[0]["message"], ("message here",) )
+ d.addCallback(_check_observer)
+ def _done(res):
+ return logport.callRemote("unsubscribe", self._subscription)
+ d.addCallback(_done)
return d
d.addCallback(_got_logport)
return d
def remote_logport(self, nodeid, logport):
d = logport.callRemote("get_versions")
d.addCallback(self.d.callback)
+
+class LogObserver(Referenceable):
+ implements(logpublisher.RILogObserver)
+ def __init__(self):
+ self.messages = []
+ def remote_msg(self, d):
+ self.messages.append(d)