From c86e803282d8ef109ff37e9a2e094b9191b9d917 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Sun, 18 Nov 2007 18:32:04 -0700 Subject: [PATCH] logtool: add 'gather' and 'dump' modes --- misc/logtool.py | 192 +++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 175 insertions(+), 17 deletions(-) diff --git a/misc/logtool.py b/misc/logtool.py index 4471d4a0..ff61b6ab 100644 --- a/misc/logtool.py +++ b/misc/logtool.py @@ -1,43 +1,201 @@ #! /usr/bin/python -import sys +import os.path, time, pickle import foolscap +from foolscap import RemoteInterface +from foolscap.eventual import fireEventually from foolscap.schema import DictOf, Any -from twisted.internet import reactor +from twisted.internet import reactor, defer from zope.interface import implements -from twisted.python import log +from twisted.python import usage +#from twisted.python import log +#import sys #log.startLogging(sys.stderr) +class Options(usage.Options): + longdesc = """ + logtool tail FURL : follow logs of the target node + logtool gather : run as a daemon, record all logs to the current directory + logtool dump FILE : dump the logs recorded by 'logtool gather' + """ -class RILogObserver(foolscap.RemoteInterface): + def parseArgs(self, mode, *args): + self.mode = mode + if mode == "tail": + target = args[0] + if target.startswith("pb:"): + self.target_furl = target + elif os.path.isfile(target): + self.target_furl = open(target, "r").read().strip() + elif os.path.isdir(target): + fn = os.path.join(target, "logport.furl") + self.target_furl = open(fn, "r").read().strip() + else: + raise RuntimeError("Can't use tail target: %s" % target) + elif mode == "dump": + self.dumpfile = args[0] + + +class RILogObserver(RemoteInterface): def msg(logmsg=DictOf(str, Any())): return None +class RISubscription(RemoteInterface): + pass -class LogFetcher(foolscap.Referenceable): +class RILogPublisher(RemoteInterface): + def get_versions(): + return DictOf(str, str) + def subscribe_to_all(observer=RILogObserver): + return RISubscription + def unsubscribe(subscription=Any()): + # I don't know how to get the constraint right: unsubscribe() should + # accept return value of subscribe_to_all() + return None + +class RILogGatherer(RemoteInterface): + def logport(nodeid=str, logport=RILogPublisher): + return None + +class LogPrinter(foolscap.Referenceable): implements(RILogObserver) + + def remote_msg(self, d): + print d + +class LogTail: + def start(self, target_furl): print "Connecting.." + d = defer.maybeDeferred(self.setup_tub) + d.addCallback(self._tub_ready, target_furl) + return d + + def setup_tub(self): self._tub = foolscap.Tub() self._tub.startService() + + def _tub_ready(self, res, target_furl): d = self._tub.getReference(target_furl) d.addCallback(self._got_logpublisher) - d.addErrback(self._error) - - def _error(self, f): - print "ERROR", f - reactor.stop() + return d def _got_logpublisher(self, publisher): print "Connected" - d = publisher.callRemote("subscribe_to_all", self) - d.addErrback(self._error) + lp = LogPrinter() + d = publisher.callRemote("subscribe_to_all", lp) + return d def remote_msg(self, d): print d +class LogSaver(foolscap.Referenceable): + implements(RILogObserver) + def __init__(self, nodeid, savefile): + self.nodeid = nodeid + self.f = savefile + + def remote_msg(self, d): + e = {"from": self.nodeid, + "rx_time": time.time(), + "d": d, + } + pickle.dump(e, self.f) + + def disconnected(self): + del self.f + from allmydata.util.idlib import shortnodeid_b2a + print "LOGPORT CLOSED", shortnodeid_b2a(self.nodeid) + +class LogGatherer(foolscap.Referenceable): + implements(RILogGatherer) + + def start(self, res): + self._savefile = open("logs.pickle", "ab", 0) + d = self.setup_tub() + d.addCallback(self._tub_ready) + return d + + def setup_tub(self): + from allmydata.util import iputil + self._tub = foolscap.Tub(certFile="gatherer.pem") + self._tub.startService() + portnumfile = "portnum" + try: + portnum = int(open(portnumfile, "r").read()) + except (EnvironmentError, ValueError): + portnum = 0 + self._tub.listenOn("tcp:%d" % portnum) + d = defer.maybeDeferred(iputil.get_local_addresses_async) + d.addCallback(self._set_location) + return d + + def _set_location(self, local_addresses): + l = self._tub.getListeners()[0] + portnum = l.getPortnum() + portnumfile = "portnum" + open(portnumfile, "w").write("%d\n" % portnum) + local_addresses = [ "%s:%d" % (addr, portnum,) + for addr in local_addresses ] + location = ",".join(local_addresses) + self._tub.setLocation(location) + + def _tub_ready(self, res): + me = self._tub.registerReference(self, furlFile="log_gatherer.furl") + print "Gatherer waiting at:", me + + def remote_logport(self, nodeid, publisher): + from allmydata.util.idlib import shortnodeid_b2a + short = shortnodeid_b2a(nodeid) + print "GOT LOGPORT", short + ls = LogSaver(nodeid, self._savefile) + publisher.callRemote("subscribe_to_all", ls) + publisher.notifyOnDisconnect(ls.disconnected) + +class LogDumper: + def start(self, options): + from allmydata.util.idlib import shortnodeid_b2a + fn = options.dumpfile + f = open(fn, "rb") + while True: + try: + e = pickle.load(f) + short = shortnodeid_b2a(e['from']) + when = e['rx_time'] + print "%s %r: %r" % (short, when, e['d']) + except EOFError: + break + +class LogTool: + + def run(self, options): + mode = options.mode + if mode == "tail": + lt = LogTail() + d = fireEventually(options.target_furl) + d.addCallback(lt.start) + d.addErrback(self._error) + print "starting.." + reactor.run() + elif mode == "gather": + lg = LogGatherer() + d = fireEventually() + d.addCallback(lg.start) + d.addErrback(self._error) + print "starting.." + reactor.run() + elif mode == "dump": + ld = LogDumper() + ld.start(options) + else: + print "unknown mode '%s'" % mode + raise NotImplementedError + + def _error(self, f): + print "ERROR", f + reactor.stop() -target_furl = sys.argv[1] -lf = LogFetcher() -lf.start(target_furl) -#print "starting.." -reactor.run() +if __name__ == '__main__': + o = Options() + o.parseOptions() + lt = LogTool() + lt.run(o) -- 2.45.2