]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
stats: make StatsGatherer happy about sharing a process with other services, add...
authorBrian Warner <warner@allmydata.com>
Tue, 4 Mar 2008 06:55:58 +0000 (23:55 -0700)
committerBrian Warner <warner@allmydata.com>
Tue, 4 Mar 2008 06:55:58 +0000 (23:55 -0700)
src/allmydata/stats.py
src/allmydata/test/test_system.py

index 1d060d6e034cce94b52b9e19e9c3e4dba5792eac..9d3d97dde15a231f4bc8f3f57c9565b6903657b8 100644 (file)
@@ -12,6 +12,8 @@ from twisted.application.internet import TimerService
 from zope.interface import implements
 import foolscap
 from foolscap.logging.gatherer import get_local_ip_for
+from twisted.internet.error import ConnectionDone, ConnectionLost
+from foolscap import DeadReferenceError
 
 from allmydata.util import log
 from allmydata.interfaces import RIStatsProvider, RIStatsGatherer, IStatsProducer
@@ -29,17 +31,23 @@ class LoadMonitor(service.MultiService):
         self.started = False
         self.last = None
         self.stats = deque()
+        self.timer = None
 
     def startService(self):
         if not self.started:
             self.started = True
-            reactor.callLater(self.loop_interval, self.loop)
+            self.timer = reactor.callLater(self.loop_interval, self.loop)
         service.MultiService.startService(self)
 
     def stopService(self):
         self.started = False
+        if self.timer:
+            self.timer.cancel()
+            self.timer = None
+        return service.MultiService.stopService(self)
 
     def loop(self):
+        self.timer = None
         if not self.started:
             return
         now = time.time()
@@ -53,7 +61,7 @@ class LoadMonitor(service.MultiService):
                 self.stats.popleft()
 
         self.last = now
-        reactor.callLater(self.loop_interval, self.loop)
+        self.timer = reactor.callLater(self.loop_interval, self.loop)
 
     def get_stats(self):
         if self.stats:
@@ -102,27 +110,34 @@ class StatsProvider(foolscap.Referenceable, service.MultiService):
         return { 'counters': self.counters, 'stats': stats }
 
     def _connected(self, gatherer, nickname):
-        gatherer.callRemote('provide', self, nickname or '')
+        gatherer.callRemoteOnly('provide', self, nickname or '')
 
 class StatsGatherer(foolscap.Referenceable, service.MultiService):
     implements(RIStatsGatherer)
 
     poll_interval = 60
 
-    def __init__(self, tub):
+    def __init__(self, tub, basedir):
         service.MultiService.__init__(self)
         self.tub = tub
+        self.basedir = basedir
 
         self.clients = {}
         self.nicknames = {}
 
     def startService(self):
+        # the Tub must have a location set on it by now
+        service.MultiService.startService(self)
         self.timer = TimerService(self.poll_interval, self.poll)
         self.timer.setServiceParent(self)
-        service.MultiService.startService(self)
+        self.registerGatherer()
 
     def get_furl(self):
-        return self.tub.registerReference(self, furlFile='stats_gatherer.furl')
+        return self.my_furl
+
+    def registerGatherer(self):
+        furl_file = os.path.join(self.basedir, "stats_gatherer.furl")
+        self.my_furl = self.tub.registerReference(self, furlFile=furl_file)
 
     def get_tubid(self, rref):
         return foolscap.SturdyRef(rref.tracker.getURL()).getTubRef().getTubID()
@@ -135,43 +150,55 @@ class StatsGatherer(foolscap.Referenceable, service.MultiService):
             return
         self.clients[tubid] = provider
         self.nicknames[tubid] = nickname
-        provider.notifyOnDisconnect(self.lost_client, tubid)
-
-    def lost_client(self, tubid):
-        del self.clients[tubid]
-        del self.nicknames[tubid]
 
     def poll(self):
         for tubid,client in self.clients.items():
             nickname = self.nicknames.get(tubid)
             d = client.callRemote('get_stats')
-            d.addCallback(self.got_stats, tubid, nickname)
+            d.addCallbacks(self.got_stats, self.lost_client,
+                           callbackArgs=(tubid, nickname),
+                           errbackArgs=(tubid,))
+            d.addErrback(self.log_client_error, tubid)
+
+    def lost_client(self, f, tubid):
+        # this is called lazily, when a get_stats request fails
+        del self.clients[tubid]
+        del self.nicknames[tubid]
+        f.trap(DeadReferenceError, ConnectionDone, ConnectionLost)
+
+    def log_client_error(self, f, tubid):
+        log.msg("StatsGatherer: error in get_stats(), peerid=%s" % tubid,
+                level=log.UNUSUAL, failure=f)
 
     def got_stats(self, stats, tubid, nickname):
         raise NotImplementedError()
 
 class StdOutStatsGatherer(StatsGatherer):
+    verbose = True
     def remote_provide(self, provider, nickname):
         tubid = self.get_tubid(provider)
-        print 'connect "%s" [%s]' % (nickname, tubid)
+        if self.verbose:
+            print 'connect "%s" [%s]' % (nickname, tubid)
+            provider.notifyOnDisconnect(self.announce_lost_client, tubid)
         StatsGatherer.remote_provide(self, provider, nickname)
 
-    def lost_client(self, tubid):
+    def announce_lost_client(self, tubid):
         print 'disconnect "%s" [%s]:' % (self.nicknames[tubid], tubid)
-        StatsGatherer.lost_client(self, tubid)
 
     def got_stats(self, stats, tubid, nickname):
         print '"%s" [%s]:' % (nickname, tubid)
         pprint.pprint(stats)
 
-class PickleStatsGatherer(StdOutStatsGatherer): # for connect/disconnect notifications;
-#class PickleStatsGatherer(StatsGatherer):
-    def __init__(self, tub, picklefile):
-        StatsGatherer.__init__(self, tub)
-        self.picklefile = picklefile
+class PickleStatsGatherer(StdOutStatsGatherer):
+    # inherit from StdOutStatsGatherer for connect/disconnect notifications
+
+    def __init__(self, tub, basedir=".", verbose=True):
+        self.verbose = verbose
+        StatsGatherer.__init__(self, tub, basedir)
+        self.picklefile = os.path.join(basedir, "stats.pickle")
 
-        if os.path.exists(picklefile):
-            f = open(picklefile, 'rb')
+        if os.path.exists(self.picklefile):
+            f = open(self.picklefile, 'rb')
             self.gathered_stats = pickle.load(f)
             f.close()
         else:
@@ -230,7 +257,7 @@ class GathererApp(object):
         self._tub.setLocation(location)
 
     def _tub_ready(self, tub):
-        sg = PickleStatsGatherer(tub, 'stats.pickle')
+        sg = PickleStatsGatherer(tub, ".")
         sg.setServiceParent(tub)
         sg.verbose = True
         print '\nStatsGatherer: %s\n' % (sg.get_furl(),)
index 8c6658a6c0c59867638f4a295397f21488bd5221..bd9596202a6f758bfcdb2084aa322cdf7bf8545f 100644 (file)
@@ -14,8 +14,9 @@ from allmydata.util import log
 from allmydata.scripts import runner
 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
 from allmydata.mutable import NotMutableError
+from allmydata.stats import PickleStatsGatherer
 from foolscap.eventual import flushEventualQueue
-from foolscap import DeadReferenceError
+from foolscap import DeadReferenceError, Tub
 from twisted.python.failure import Failure
 from twisted.web.client import getPage
 from twisted.web.error import Error
@@ -73,9 +74,24 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
         iv = IntroducerNode(basedir=iv_dir)
         self.introducer = self.add_service(iv)
         d = self.introducer.when_tub_ready()
+        d.addCallback(self._set_up_stats_gatherer)
         d.addCallback(self._set_up_nodes_2)
+        d.addCallback(self._grab_stats)
         return d
 
+    def _set_up_stats_gatherer(self, res):
+        statsdir = self.getdir("stats_gatherer")
+        fileutil.make_dirs(statsdir)
+        t = Tub()
+        self.add_service(t)
+        l = t.listenOn("tcp:0")
+        p = l.getPortnum()
+        t.setLocation("localhost:%d" % p)
+
+        self.stats_gatherer = PickleStatsGatherer(t, statsdir, False)
+        self.add_service(self.stats_gatherer)
+        self.stats_gatherer_furl = self.stats_gatherer.get_furl()
+
     def _set_up_nodes_2(self, res):
         q = self.introducer
         self.introducer_furl = q.introducer_url
@@ -96,6 +112,7 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
                 fileutil.make_dirs(os.path.join(basedir, "private"))
                 open(os.path.join(basedir, "private", "root_dir.cap"), "w")
             open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
+            open(os.path.join(basedir, "stats_gatherer.furl"), "w").write(self.stats_gatherer_furl)
 
         # start client[0], wait for it's tub to be ready (at which point it
         # will have registered the helper furl).
@@ -131,6 +148,10 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
         d.addCallback(_connected)
         return d
 
+    def _grab_stats(self, res):
+        d = self.stats_gatherer.poll()
+        return d
+
     def bounce_client(self, num):
         c = self.clients[num]
         d = c.disownServiceParent()
@@ -224,15 +245,16 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
     del test_connections
 
     def test_upload_and_download_random_key(self):
+        self.basedir = "system/SystemTest/test_upload_and_download_random_key"
         return self._test_upload_and_download(False)
     test_upload_and_download_random_key.timeout = 4800
 
     def test_upload_and_download_content_hash_key(self):
+        self.basedir = "system/SystemTest/test_upload_and_download_CHK"
         return self._test_upload_and_download(True)
     test_upload_and_download_content_hash_key.timeout = 4800
 
     def _test_upload_and_download(self, contenthashkey):
-        self.basedir = "system/SystemTest/test_upload_and_download"
         # we use 4000 bytes of data, which will result in about 400k written
         # to disk among all our simulated nodes
         DATA = "Some data to upload\n" * 200
@@ -836,6 +858,7 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
         # P/test_put/  (empty)
         d.addCallback(self._test_checker)
         d.addCallback(self._test_verifier)
+        d.addCallback(self._grab_stats)
         return d
     test_vdrive.timeout = 1100