]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - misc/logtool.py
9f0beeb5069b02bdb867b8babd5962c03958d32f
[tahoe-lafs/tahoe-lafs.git] / misc / logtool.py
1 #!/usr/bin/env python
2
3 import os.path, time, pickle
4 import foolscap
5 from foolscap import RemoteInterface
6 from foolscap.eventual import fireEventually
7 from foolscap.schema import DictOf, Any
8 from twisted.internet import reactor, defer
9 from zope.interface import implements
10 from twisted.python import usage
11 #from twisted.python import log
12 #import sys
13 #log.startLogging(sys.stderr)
14
15 class Options(usage.Options):
16     longdesc = """
17     logtool tail FURL : follow logs of the target node
18     logtool gather : run as a daemon, record all logs to the current directory
19     logtool dump FILE : dump the logs recorded by 'logtool gather'
20     """
21
22     def parseArgs(self, mode, *args):
23         self.mode = mode
24         if mode == "tail":
25             target = args[0]
26             if target.startswith("pb:"):
27                 self.target_furl = target
28             elif os.path.isfile(target):
29                 self.target_furl = open(target, "r").read().strip()
30             elif os.path.isdir(target):
31                 fn = os.path.join(target, "logport.furl")
32                 self.target_furl = open(fn, "r").read().strip()
33             else:
34                 raise ValueError("Can't use tail target: %s" % target)
35         elif mode == "dump":
36             self.dumpfile = args[0]
37
38
39 class RILogObserver(RemoteInterface):
40     def msg(logmsg=DictOf(str, Any())):
41         return None
42 class RISubscription(RemoteInterface):
43     pass
44
45 class RILogPublisher(RemoteInterface):
46     def get_versions():
47         return DictOf(str, str)
48     def subscribe_to_all(observer=RILogObserver):
49         return RISubscription
50     def unsubscribe(subscription=Any()):
51         # I don't know how to get the constraint right: unsubscribe() should
52         # accept return value of subscribe_to_all()
53         return None
54
55 class RILogGatherer(RemoteInterface):
56     def logport(nodeid=str, logport=RILogPublisher):
57         return None
58
59 class LogPrinter(foolscap.Referenceable):
60     implements(RILogObserver)
61
62     def remote_msg(self, d):
63         print d
64
65 class LogTail:
66
67     def start(self, target_furl):
68         print "Connecting.."
69         d = defer.maybeDeferred(self.setup_tub)
70         d.addCallback(self._tub_ready, target_furl)
71         return d
72
73     def setup_tub(self):
74         self._tub = foolscap.Tub()
75         self._tub.startService()
76
77     def _tub_ready(self, res, target_furl):
78         d = self._tub.getReference(target_furl)
79         d.addCallback(self._got_logpublisher)
80         return d
81
82     def _got_logpublisher(self, publisher):
83         print "Connected"
84         lp = LogPrinter()
85         d = publisher.callRemote("subscribe_to_all", lp)
86         return d
87
88     def remote_msg(self, d):
89         print d
90
91 class LogSaver(foolscap.Referenceable):
92     implements(RILogObserver)
93     def __init__(self, nodeid, savefile):
94         self.nodeid = nodeid
95         self.f = savefile
96
97     def remote_msg(self, d):
98         e = {"from": self.nodeid,
99              "rx_time": time.time(),
100              "d": d,
101              }
102         pickle.dump(e, self.f)
103
104     def disconnected(self):
105         del self.f
106         from allmydata.util.idlib import shortnodeid_b2a
107         print "LOGPORT CLOSED", shortnodeid_b2a(self.nodeid)
108
109 class LogGatherer(foolscap.Referenceable):
110     implements(RILogGatherer)
111
112     def start(self, res):
113         self._savefile = open("logs.pickle", "ab", 0)
114         d = self.setup_tub()
115         d.addCallback(self._tub_ready)
116         return d
117
118     def setup_tub(self):
119         from allmydata.util import iputil
120         self._tub = foolscap.Tub(certFile="gatherer.pem")
121         self._tub.startService()
122         portnumfile = "portnum"
123         try:
124             portnum = int(open(portnumfile, "r").read())
125         except (EnvironmentError, ValueError):
126             portnum = 0
127         self._tub.listenOn("tcp:%d" % portnum)
128         d = defer.maybeDeferred(iputil.get_local_addresses_async)
129         d.addCallback(self._set_location)
130         return d
131
132     def _set_location(self, local_addresses):
133         l = self._tub.getListeners()[0]
134         portnum = l.getPortnum()
135         portnumfile = "portnum"
136         open(portnumfile, "w").write("%d\n" % portnum)
137         local_addresses = [ "%s:%d" % (addr, portnum,)
138                             for addr in local_addresses ]
139         location = ",".join(local_addresses)
140         self._tub.setLocation(location)
141
142     def _tub_ready(self, res):
143         me = self._tub.registerReference(self, furlFile="log_gatherer.furl")
144         print "Gatherer waiting at:", me
145
146     def remote_logport(self, nodeid, publisher):
147         from allmydata.util.idlib import shortnodeid_b2a
148         short = shortnodeid_b2a(nodeid)
149         print "GOT LOGPORT", short
150         ls = LogSaver(nodeid, self._savefile)
151         publisher.callRemote("subscribe_to_all", ls)
152         publisher.notifyOnDisconnect(ls.disconnected)
153
154 class LogDumper:
155     def start(self, options):
156         from allmydata.util.idlib import shortnodeid_b2a
157         fn = options.dumpfile
158         f = open(fn, "rb")
159         while True:
160             try:
161                 e = pickle.load(f)
162                 short = shortnodeid_b2a(e['from'])
163                 when = e['rx_time']
164                 print "%s %r: %r" % (short, when, e['d'])
165             except EOFError:
166                 break
167
168 class LogTool:
169
170     def run(self, options):
171         mode = options.mode
172         if mode == "tail":
173             lt = LogTail()
174             d = fireEventually(options.target_furl)
175             d.addCallback(lt.start)
176             d.addErrback(self._error)
177             print "starting.."
178             reactor.run()
179         elif mode == "gather":
180             lg = LogGatherer()
181             d = fireEventually()
182             d.addCallback(lg.start)
183             d.addErrback(self._error)
184             print "starting.."
185             reactor.run()
186         elif mode == "dump":
187             ld = LogDumper()
188             ld.start(options)
189         else:
190             print "unknown mode '%s'" % mode
191             raise NotImplementedError
192
193     def _error(self, f):
194         print "ERROR", f
195         reactor.stop()
196
197 if __name__ == '__main__':
198     o = Options()
199     o.parseOptions()
200     lt = LogTool()
201     lt.run(o)