#! /usr/bin/python
-import sys
+import os.path, time, pickle
import foolscap
+from foolscap import RemoteInterface
+from foolscap.eventual import fireEventually
from foolscap.schema import DictOf, Any
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
from zope.interface import implements
-from twisted.python import log
+from twisted.python import usage
+#from twisted.python import log
+#import sys
#log.startLogging(sys.stderr)
+class Options(usage.Options):
+ longdesc = """
+ logtool tail FURL : follow logs of the target node
+ logtool gather : run as a daemon, record all logs to the current directory
+ logtool dump FILE : dump the logs recorded by 'logtool gather'
+ """
-class RILogObserver(foolscap.RemoteInterface):
+ def parseArgs(self, mode, *args):
+ self.mode = mode
+ if mode == "tail":
+ target = args[0]
+ if target.startswith("pb:"):
+ self.target_furl = target
+ elif os.path.isfile(target):
+ self.target_furl = open(target, "r").read().strip()
+ elif os.path.isdir(target):
+ fn = os.path.join(target, "logport.furl")
+ self.target_furl = open(fn, "r").read().strip()
+ else:
+ raise RuntimeError("Can't use tail target: %s" % target)
+ elif mode == "dump":
+ self.dumpfile = args[0]
+
+
+class RILogObserver(RemoteInterface):
def msg(logmsg=DictOf(str, Any())):
return None
+class RISubscription(RemoteInterface):
+ pass
-class LogFetcher(foolscap.Referenceable):
+class RILogPublisher(RemoteInterface):
+ def get_versions():
+ return DictOf(str, str)
+ def subscribe_to_all(observer=RILogObserver):
+ return RISubscription
+ def unsubscribe(subscription=Any()):
+ # I don't know how to get the constraint right: unsubscribe() should
+ # accept return value of subscribe_to_all()
+ return None
+
+class RILogGatherer(RemoteInterface):
+ def logport(nodeid=str, logport=RILogPublisher):
+ return None
+
+class LogPrinter(foolscap.Referenceable):
implements(RILogObserver)
+
+ def remote_msg(self, d):
+ print d
+
+class LogTail:
+
def start(self, target_furl):
print "Connecting.."
+ d = defer.maybeDeferred(self.setup_tub)
+ d.addCallback(self._tub_ready, target_furl)
+ return d
+
+ def setup_tub(self):
self._tub = foolscap.Tub()
self._tub.startService()
+
+ def _tub_ready(self, res, target_furl):
d = self._tub.getReference(target_furl)
d.addCallback(self._got_logpublisher)
- d.addErrback(self._error)
-
- def _error(self, f):
- print "ERROR", f
- reactor.stop()
+ return d
def _got_logpublisher(self, publisher):
print "Connected"
- d = publisher.callRemote("subscribe_to_all", self)
- d.addErrback(self._error)
+ lp = LogPrinter()
+ d = publisher.callRemote("subscribe_to_all", lp)
+ return d
def remote_msg(self, d):
print d
+class LogSaver(foolscap.Referenceable):
+ implements(RILogObserver)
+ def __init__(self, nodeid, savefile):
+ self.nodeid = nodeid
+ self.f = savefile
+
+ def remote_msg(self, d):
+ e = {"from": self.nodeid,
+ "rx_time": time.time(),
+ "d": d,
+ }
+ pickle.dump(e, self.f)
+
+ def disconnected(self):
+ del self.f
+ from allmydata.util.idlib import shortnodeid_b2a
+ print "LOGPORT CLOSED", shortnodeid_b2a(self.nodeid)
+
+class LogGatherer(foolscap.Referenceable):
+ implements(RILogGatherer)
+
+ def start(self, res):
+ self._savefile = open("logs.pickle", "ab", 0)
+ d = self.setup_tub()
+ d.addCallback(self._tub_ready)
+ return d
+
+ def setup_tub(self):
+ from allmydata.util import iputil
+ self._tub = foolscap.Tub(certFile="gatherer.pem")
+ self._tub.startService()
+ portnumfile = "portnum"
+ try:
+ portnum = int(open(portnumfile, "r").read())
+ except (EnvironmentError, ValueError):
+ portnum = 0
+ self._tub.listenOn("tcp:%d" % portnum)
+ d = defer.maybeDeferred(iputil.get_local_addresses_async)
+ d.addCallback(self._set_location)
+ return d
+
+ def _set_location(self, local_addresses):
+ l = self._tub.getListeners()[0]
+ portnum = l.getPortnum()
+ portnumfile = "portnum"
+ open(portnumfile, "w").write("%d\n" % portnum)
+ local_addresses = [ "%s:%d" % (addr, portnum,)
+ for addr in local_addresses ]
+ location = ",".join(local_addresses)
+ self._tub.setLocation(location)
+
+ def _tub_ready(self, res):
+ me = self._tub.registerReference(self, furlFile="log_gatherer.furl")
+ print "Gatherer waiting at:", me
+
+ def remote_logport(self, nodeid, publisher):
+ from allmydata.util.idlib import shortnodeid_b2a
+ short = shortnodeid_b2a(nodeid)
+ print "GOT LOGPORT", short
+ ls = LogSaver(nodeid, self._savefile)
+ publisher.callRemote("subscribe_to_all", ls)
+ publisher.notifyOnDisconnect(ls.disconnected)
+
+class LogDumper:
+ def start(self, options):
+ from allmydata.util.idlib import shortnodeid_b2a
+ fn = options.dumpfile
+ f = open(fn, "rb")
+ while True:
+ try:
+ e = pickle.load(f)
+ short = shortnodeid_b2a(e['from'])
+ when = e['rx_time']
+ print "%s %r: %r" % (short, when, e['d'])
+ except EOFError:
+ break
+
+class LogTool:
+
+ def run(self, options):
+ mode = options.mode
+ if mode == "tail":
+ lt = LogTail()
+ d = fireEventually(options.target_furl)
+ d.addCallback(lt.start)
+ d.addErrback(self._error)
+ print "starting.."
+ reactor.run()
+ elif mode == "gather":
+ lg = LogGatherer()
+ d = fireEventually()
+ d.addCallback(lg.start)
+ d.addErrback(self._error)
+ print "starting.."
+ reactor.run()
+ elif mode == "dump":
+ ld = LogDumper()
+ ld.start(options)
+ else:
+ print "unknown mode '%s'" % mode
+ raise NotImplementedError
+
+ def _error(self, f):
+ print "ERROR", f
+ reactor.stop()
-target_furl = sys.argv[1]
-lf = LogFetcher()
-lf.start(target_furl)
-#print "starting.."
-reactor.run()
+if __name__ == '__main__':
+ o = Options()
+ o.parseOptions()
+ lt = LogTool()
+ lt.run(o)