]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/stats.py
If a stats.pickle file cannot be read, print a better error message.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / stats.py
index 7e3c64f6fc5a35cb1c4ed3f1f3dc39fd04eba8e2..ea0a7f85d850dacd74584ecdfa3dd2e5e08d8155 100644 (file)
@@ -2,18 +2,17 @@
 import os
 import pickle
 import pprint
-import sys
 import time
 from collections import deque
 
-from twisted.internet import reactor, defer
+from twisted.internet import reactor
 from twisted.application import service
 from twisted.application.internet import TimerService
 from zope.interface import implements
-import foolscap
-from foolscap.logging.gatherer import get_local_ip_for
+from foolscap.api import eventually, DeadReferenceError, Referenceable, Tub
 
 from allmydata.util import log
+from allmydata.util.encodingutil import quote_output
 from allmydata.interfaces import RIStatsProvider, RIStatsGatherer, IStatsProducer
 
 class LoadMonitor(service.MultiService):
@@ -26,21 +25,27 @@ class LoadMonitor(service.MultiService):
         service.MultiService.__init__(self)
         self.provider = provider
         self.warn_if_delay_exceeds = warn_if_delay_exceeds
-        self.running = False
+        self.started = False
         self.last = None
         self.stats = deque()
+        self.timer = None
 
     def startService(self):
-        if not self.running:
-            self.running = True
-            reactor.callLater(self.loop_interval, self.loop)
+        if not self.started:
+            self.started = True
+            self.timer = reactor.callLater(self.loop_interval, self.loop)
         service.MultiService.startService(self)
 
     def stopService(self):
-        self.running = False
+        self.started = False
+        if self.timer:
+            self.timer.cancel()
+            self.timer = None
+        return service.MultiService.stopService(self)
 
     def loop(self):
-        if not self.running:
+        self.timer = None
+        if not self.started:
             return
         now = time.time()
         if self.last is not None:
@@ -53,7 +58,7 @@ class LoadMonitor(service.MultiService):
                 self.stats.popleft()
 
         self.last = now
-        reactor.callLater(self.loop_interval, self.loop)
+        self.timer = reactor.callLater(self.loop_interval, self.loop)
 
     def get_stats(self):
         if self.stats:
@@ -64,67 +69,132 @@ class LoadMonitor(service.MultiService):
         return { 'load_monitor.avg_load': avg,
                  'load_monitor.max_load': m_x, }
 
-class StatsProvider(foolscap.Referenceable, service.MultiService):
+class CPUUsageMonitor(service.MultiService):
+    implements(IStatsProducer)
+    HISTORY_LENGTH = 15
+    POLL_INTERVAL = 60
+
+    def __init__(self):
+        service.MultiService.__init__(self)
+        # we don't use time.clock() here, because the constructor is run by
+        # the twistd parent process (as it loads the .tac file), whereas the
+        # rest of the program will be run by the child process, after twistd
+        # forks. Instead, set self.initial_cpu as soon as the reactor starts
+        # up.
+        self.initial_cpu = 0.0 # just in case
+        eventually(self._set_initial_cpu)
+        self.samples = []
+        # we provide 1min, 5min, and 15min moving averages
+        TimerService(self.POLL_INTERVAL, self.check).setServiceParent(self)
+
+    def _set_initial_cpu(self):
+        self.initial_cpu = time.clock()
+
+    def check(self):
+        now_wall = time.time()
+        now_cpu = time.clock()
+        self.samples.append( (now_wall, now_cpu) )
+        while len(self.samples) > self.HISTORY_LENGTH+1:
+            self.samples.pop(0)
+
+    def _average_N_minutes(self, size):
+        if len(self.samples) < size+1:
+            return None
+        first = -size-1
+        elapsed_wall = self.samples[-1][0] - self.samples[first][0]
+        elapsed_cpu = self.samples[-1][1] - self.samples[first][1]
+        fraction = elapsed_cpu / elapsed_wall
+        return fraction
+
+    def get_stats(self):
+        s = {}
+        avg = self._average_N_minutes(1)
+        if avg is not None:
+            s["cpu_monitor.1min_avg"] = avg
+        avg = self._average_N_minutes(5)
+        if avg is not None:
+            s["cpu_monitor.5min_avg"] = avg
+        avg = self._average_N_minutes(15)
+        if avg is not None:
+            s["cpu_monitor.15min_avg"] = avg
+        now_cpu = time.clock()
+        s["cpu_monitor.total"] = now_cpu - self.initial_cpu
+        return s
+
+
+class StatsProvider(Referenceable, service.MultiService):
     implements(RIStatsProvider)
 
     def __init__(self, node, gatherer_furl):
         service.MultiService.__init__(self)
         self.node = node
-        self.gatherer_furl = gatherer_furl
+        self.gatherer_furl = gatherer_furl # might be None
 
         self.counters = {}
         self.stats_producers = []
 
-        self.load_monitor = LoadMonitor(self)
-        self.load_monitor.setServiceParent(self)
-        self.register_producer(self.load_monitor)
+        # only run the LoadMonitor (which submits a timer every second) if
+        # there is a gatherer who is going to be paying attention. Our stats
+        # are visible through HTTP even without a gatherer, so run the rest
+        # of the stats (including the once-per-minute CPUUsageMonitor)
+        if gatherer_furl:
+            self.load_monitor = LoadMonitor(self)
+            self.load_monitor.setServiceParent(self)
+            self.register_producer(self.load_monitor)
+
+        self.cpu_monitor = CPUUsageMonitor()
+        self.cpu_monitor.setServiceParent(self)
+        self.register_producer(self.cpu_monitor)
 
     def startService(self):
-        if self.node:
+        if self.node and self.gatherer_furl:
             d = self.node.when_tub_ready()
             def connect(junk):
-                nickname = self.node.get_config('nickname')
-                self.node.tub.connectTo(self.gatherer_furl, self._connected, nickname)
+                nickname_utf8 = self.node.nickname.encode("utf-8")
+                self.node.tub.connectTo(self.gatherer_furl,
+                                        self._connected, nickname_utf8)
             d.addCallback(connect)
+        service.MultiService.startService(self)
 
-    def count(self, name, delta):
+    def count(self, name, delta=1):
         val = self.counters.setdefault(name, 0)
         self.counters[name] = val + delta
 
     def register_producer(self, stats_producer):
         self.stats_producers.append(IStatsProducer(stats_producer))
 
-    def remote_get_stats(self):
+    def get_stats(self):
         stats = {}
         for sp in self.stats_producers:
             stats.update(sp.get_stats())
-        return { 'counters': self.counters, 'stats': stats }
+        ret = { 'counters': self.counters, 'stats': stats }
+        log.msg(format='get_stats() -> %(stats)s', stats=ret, level=log.NOISY)
+        return ret
+
+    def remote_get_stats(self):
+        return self.get_stats()
 
     def _connected(self, gatherer, nickname):
-        gatherer.callRemote('provide', self, nickname or '')
+        gatherer.callRemoteOnly('provide', self, nickname or '')
 
-class StatsGatherer(foolscap.Referenceable, service.MultiService):
+
+class StatsGatherer(Referenceable, service.MultiService):
     implements(RIStatsGatherer)
 
     poll_interval = 60
 
-    def __init__(self, tub):
+    def __init__(self, basedir):
         service.MultiService.__init__(self)
-        self.tub = tub
+        self.basedir = basedir
 
         self.clients = {}
         self.nicknames = {}
 
-    def startService(self):
         self.timer = TimerService(self.poll_interval, self.poll)
         self.timer.setServiceParent(self)
-        service.MultiService.startService(self)
-
-    def get_furl(self):
-        return self.tub.registerReference(self, furlFile='stats_gatherer.furl')
 
     def get_tubid(self, rref):
-        return foolscap.SturdyRef(rref.tracker.getURL()).getTubRef().getTubID()
+        return rref.getRemoteTubID()
 
     def remote_provide(self, provider, nickname):
         tubid = self.get_tubid(provider)
@@ -134,44 +204,61 @@ class StatsGatherer(foolscap.Referenceable, service.MultiService):
             return
         self.clients[tubid] = provider
         self.nicknames[tubid] = nickname
-        provider.notifyOnDisconnect(self.lost_client, tubid)
-
-    def lost_client(self, tubid):
-        del self.clients[tubid]
-        del self.nicknames[tubid]
 
     def poll(self):
         for tubid,client in self.clients.items():
             nickname = self.nicknames.get(tubid)
             d = client.callRemote('get_stats')
-            d.addCallback(self.got_stats, tubid, nickname)
+            d.addCallbacks(self.got_stats, self.lost_client,
+                           callbackArgs=(tubid, nickname),
+                           errbackArgs=(tubid,))
+            d.addErrback(self.log_client_error, tubid)
+
+    def lost_client(self, f, tubid):
+        # this is called lazily, when a get_stats request fails
+        del self.clients[tubid]
+        del self.nicknames[tubid]
+        f.trap(DeadReferenceError)
+
+    def log_client_error(self, f, tubid):
+        log.msg("StatsGatherer: error in get_stats(), peerid=%s" % tubid,
+                level=log.UNUSUAL, failure=f)
 
     def got_stats(self, stats, tubid, nickname):
         raise NotImplementedError()
 
 class StdOutStatsGatherer(StatsGatherer):
+    verbose = True
     def remote_provide(self, provider, nickname):
         tubid = self.get_tubid(provider)
-        print 'connect "%s" [%s]' % (nickname, tubid)
+        if self.verbose:
+            print 'connect "%s" [%s]' % (nickname, tubid)
+            provider.notifyOnDisconnect(self.announce_lost_client, tubid)
         StatsGatherer.remote_provide(self, provider, nickname)
 
-    def lost_client(self, tubid):
-        print 'disconnect "%s" [%s]:' % (self.nicknames[tubid], tubid)
-        StatsGatherer.lost_client(self, tubid)
+    def announce_lost_client(self, tubid):
+        print 'disconnect "%s" [%s]' % (self.nicknames[tubid], tubid)
 
     def got_stats(self, stats, tubid, nickname):
         print '"%s" [%s]:' % (nickname, tubid)
         pprint.pprint(stats)
 
-class PickleStatsGatherer(StdOutStatsGatherer): # for connect/disconnect notifications;
-#class PickleStatsGatherer(StatsGatherer):
-    def __init__(self, tub, picklefile):
-        StatsGatherer.__init__(self, tub)
-        self.picklefile = picklefile
+class PickleStatsGatherer(StdOutStatsGatherer):
+    # inherit from StdOutStatsGatherer for connect/disconnect notifications
+
+    def __init__(self, basedir=".", verbose=True):
+        self.verbose = verbose
+        StatsGatherer.__init__(self, basedir)
+        self.picklefile = os.path.join(basedir, "stats.pickle")
 
-        if os.path.exists(picklefile):
-            f = open(picklefile, 'rb')
-            self.gathered_stats = pickle.load(f)
+        if os.path.exists(self.picklefile):
+            f = open(self.picklefile, 'rb')
+            try:
+                self.gathered_stats = pickle.load(f)
+            except Exception:
+                print ("Error while attempting to load pickle file %s.\nYou may need to delete this file.\n" %
+                       quote_output(os.path.abspath(self.picklefile)))
+                raise
             f.close()
         else:
             self.gathered_stats = {}
@@ -192,51 +279,40 @@ class PickleStatsGatherer(StdOutStatsGatherer): # for connect/disconnect notific
             os.unlink(self.picklefile)
         os.rename(tmp, self.picklefile)
 
-class GathererApp(object):
-    def __init__(self):
-        d = self.setup_tub()
-        d.addCallback(self._tub_ready)
-
-    def setup_tub(self):
-        self._tub = foolscap.Tub(certFile="stats_gatherer.pem")
-        self._tub.setOption("logLocalFailures", True)
-        self._tub.setOption("logRemoteFailures", True)
-        self._tub.startService()
-        portnumfile = "portnum"
+class StatsGathererService(service.MultiService):
+    furl_file = "stats_gatherer.furl"
+
+    def __init__(self, basedir=".", verbose=False):
+        service.MultiService.__init__(self)
+        self.basedir = basedir
+        self.tub = Tub(certFile=os.path.join(self.basedir,
+                                             "stats_gatherer.pem"))
+        self.tub.setServiceParent(self)
+        self.tub.setOption("logLocalFailures", True)
+        self.tub.setOption("logRemoteFailures", True)
+        self.tub.setOption("expose-remote-exception-types", False)
+
+        self.stats_gatherer = PickleStatsGatherer(self.basedir, verbose)
+        self.stats_gatherer.setServiceParent(self)
+
+        portnumfile = os.path.join(self.basedir, "portnum")
         try:
-            portnum = int(open(portnumfile, "r").read())
-        except (EnvironmentError, ValueError):
-            portnum = 0
-        self._tub.listenOn("tcp:%d" % portnum)
-        d = defer.maybeDeferred(get_local_ip_for)
-        d.addCallback(self._set_location)
-        d.addCallback(lambda res: self._tub)
-        return d
-
-    def _set_location(self, local_address):
-        if local_address is None:
-            local_addresses = ["127.0.0.1"]
-        else:
-            local_addresses = [local_address, "127.0.0.1"]
-        l = self._tub.getListeners()[0]
-        portnum = l.getPortnum()
-        portnumfile = "portnum"
-        open(portnumfile, "w").write("%d\n" % portnum)
-        local_addresses = [ "%s:%d" % (addr, portnum,)
-                            for addr in local_addresses ]
-        assert len(local_addresses) >= 1
-        location = ",".join(local_addresses)
-        self._tub.setLocation(location)
-
-    def _tub_ready(self, tub):
-        sg = PickleStatsGatherer(tub, 'stats.pickle')
-        sg.setServiceParent(tub)
-        sg.verbose = True
-        print '\nStatsGatherer: %s\n' % (sg.get_furl(),)
-
-def main(argv):
-    ga = GathererApp()
-    reactor.run()
-
-if __name__ == '__main__':
-    main(sys.argv)
+            portnum = open(portnumfile, "r").read()
+        except EnvironmentError:
+            portnum = None
+        self.listener = self.tub.listenOn(portnum or "tcp:0")
+        d = self.tub.setLocationAutomatically()
+        if portnum is None:
+            d.addCallback(self.save_portnum)
+        d.addCallback(self.tub_ready)
+        d.addErrback(log.err)
+
+    def save_portnum(self, junk):
+        portnum = self.listener.getPortnum()
+        portnumfile = os.path.join(self.basedir, 'portnum')
+        open(portnumfile, 'wb').write('%d\n' % (portnum,))
+
+    def tub_ready(self, ignored):
+        ff = os.path.join(self.basedir, self.furl_file)
+        self.gatherer_furl = self.tub.registerReference(self.stats_gatherer,
+                                                        furlFile=ff)