From: Brian Warner Date: Tue, 4 Mar 2008 06:55:58 +0000 (-0700) Subject: stats: make StatsGatherer happy about sharing a process with other services, add... X-Git-Tag: allmydata-tahoe-0.9.0~78 X-Git-Url: https://git.rkrishnan.org/components/%22news.html/frontends/FTP-and-SFTP.txt?a=commitdiff_plain;h=7e159feb277873429225d72526101bdbe6201178;p=tahoe-lafs%2Ftahoe-lafs.git stats: make StatsGatherer happy about sharing a process with other services, add one during system test to get some test coverage --- diff --git a/src/allmydata/stats.py b/src/allmydata/stats.py index 1d060d6e..9d3d97dd 100644 --- a/src/allmydata/stats.py +++ b/src/allmydata/stats.py @@ -12,6 +12,8 @@ from twisted.application.internet import TimerService from zope.interface import implements import foolscap from foolscap.logging.gatherer import get_local_ip_for +from twisted.internet.error import ConnectionDone, ConnectionLost +from foolscap import DeadReferenceError from allmydata.util import log from allmydata.interfaces import RIStatsProvider, RIStatsGatherer, IStatsProducer @@ -29,17 +31,23 @@ class LoadMonitor(service.MultiService): self.started = False self.last = None self.stats = deque() + self.timer = None def startService(self): if not self.started: self.started = True - reactor.callLater(self.loop_interval, self.loop) + self.timer = reactor.callLater(self.loop_interval, self.loop) service.MultiService.startService(self) def stopService(self): self.started = False + if self.timer: + self.timer.cancel() + self.timer = None + return service.MultiService.stopService(self) def loop(self): + self.timer = None if not self.started: return now = time.time() @@ -53,7 +61,7 @@ class LoadMonitor(service.MultiService): self.stats.popleft() self.last = now - reactor.callLater(self.loop_interval, self.loop) + self.timer = reactor.callLater(self.loop_interval, self.loop) def get_stats(self): if self.stats: @@ -102,27 +110,34 @@ class StatsProvider(foolscap.Referenceable, service.MultiService): return { 'counters': self.counters, 'stats': stats } def _connected(self, gatherer, nickname): - gatherer.callRemote('provide', self, nickname or '') + gatherer.callRemoteOnly('provide', self, nickname or '') class StatsGatherer(foolscap.Referenceable, service.MultiService): implements(RIStatsGatherer) poll_interval = 60 - def __init__(self, tub): + def __init__(self, tub, basedir): service.MultiService.__init__(self) self.tub = tub + self.basedir = basedir self.clients = {} self.nicknames = {} def startService(self): + # the Tub must have a location set on it by now + service.MultiService.startService(self) self.timer = TimerService(self.poll_interval, self.poll) self.timer.setServiceParent(self) - service.MultiService.startService(self) + self.registerGatherer() def get_furl(self): - return self.tub.registerReference(self, furlFile='stats_gatherer.furl') + return self.my_furl + + def registerGatherer(self): + furl_file = os.path.join(self.basedir, "stats_gatherer.furl") + self.my_furl = self.tub.registerReference(self, furlFile=furl_file) def get_tubid(self, rref): return foolscap.SturdyRef(rref.tracker.getURL()).getTubRef().getTubID() @@ -135,43 +150,55 @@ class StatsGatherer(foolscap.Referenceable, service.MultiService): return self.clients[tubid] = provider self.nicknames[tubid] = nickname - provider.notifyOnDisconnect(self.lost_client, tubid) - - def lost_client(self, tubid): - del self.clients[tubid] - del self.nicknames[tubid] def poll(self): for tubid,client in self.clients.items(): nickname = self.nicknames.get(tubid) d = client.callRemote('get_stats') - d.addCallback(self.got_stats, tubid, nickname) + d.addCallbacks(self.got_stats, self.lost_client, + callbackArgs=(tubid, nickname), + errbackArgs=(tubid,)) + d.addErrback(self.log_client_error, tubid) + + def lost_client(self, f, tubid): + # this is called lazily, when a get_stats request fails + del self.clients[tubid] + del self.nicknames[tubid] + f.trap(DeadReferenceError, ConnectionDone, ConnectionLost) + + def log_client_error(self, f, tubid): + log.msg("StatsGatherer: error in get_stats(), peerid=%s" % tubid, + level=log.UNUSUAL, failure=f) def got_stats(self, stats, tubid, nickname): raise NotImplementedError() class StdOutStatsGatherer(StatsGatherer): + verbose = True def remote_provide(self, provider, nickname): tubid = self.get_tubid(provider) - print 'connect "%s" [%s]' % (nickname, tubid) + if self.verbose: + print 'connect "%s" [%s]' % (nickname, tubid) + provider.notifyOnDisconnect(self.announce_lost_client, tubid) StatsGatherer.remote_provide(self, provider, nickname) - def lost_client(self, tubid): + def announce_lost_client(self, tubid): print 'disconnect "%s" [%s]:' % (self.nicknames[tubid], tubid) - StatsGatherer.lost_client(self, tubid) def got_stats(self, stats, tubid, nickname): print '"%s" [%s]:' % (nickname, tubid) pprint.pprint(stats) -class PickleStatsGatherer(StdOutStatsGatherer): # for connect/disconnect notifications; -#class PickleStatsGatherer(StatsGatherer): - def __init__(self, tub, picklefile): - StatsGatherer.__init__(self, tub) - self.picklefile = picklefile +class PickleStatsGatherer(StdOutStatsGatherer): + # inherit from StdOutStatsGatherer for connect/disconnect notifications + + def __init__(self, tub, basedir=".", verbose=True): + self.verbose = verbose + StatsGatherer.__init__(self, tub, basedir) + self.picklefile = os.path.join(basedir, "stats.pickle") - if os.path.exists(picklefile): - f = open(picklefile, 'rb') + if os.path.exists(self.picklefile): + f = open(self.picklefile, 'rb') self.gathered_stats = pickle.load(f) f.close() else: @@ -230,7 +257,7 @@ class GathererApp(object): self._tub.setLocation(location) def _tub_ready(self, tub): - sg = PickleStatsGatherer(tub, 'stats.pickle') + sg = PickleStatsGatherer(tub, ".") sg.setServiceParent(tub) sg.verbose = True print '\nStatsGatherer: %s\n' % (sg.get_furl(),) diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 8c6658a6..bd959620 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -14,8 +14,9 @@ from allmydata.util import log from allmydata.scripts import runner from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI from allmydata.mutable import NotMutableError +from allmydata.stats import PickleStatsGatherer from foolscap.eventual import flushEventualQueue -from foolscap import DeadReferenceError +from foolscap import DeadReferenceError, Tub from twisted.python.failure import Failure from twisted.web.client import getPage from twisted.web.error import Error @@ -73,9 +74,24 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase): iv = IntroducerNode(basedir=iv_dir) self.introducer = self.add_service(iv) d = self.introducer.when_tub_ready() + d.addCallback(self._set_up_stats_gatherer) d.addCallback(self._set_up_nodes_2) + d.addCallback(self._grab_stats) return d + def _set_up_stats_gatherer(self, res): + statsdir = self.getdir("stats_gatherer") + fileutil.make_dirs(statsdir) + t = Tub() + self.add_service(t) + l = t.listenOn("tcp:0") + p = l.getPortnum() + t.setLocation("localhost:%d" % p) + + self.stats_gatherer = PickleStatsGatherer(t, statsdir, False) + self.add_service(self.stats_gatherer) + self.stats_gatherer_furl = self.stats_gatherer.get_furl() + def _set_up_nodes_2(self, res): q = self.introducer self.introducer_furl = q.introducer_url @@ -96,6 +112,7 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase): fileutil.make_dirs(os.path.join(basedir, "private")) open(os.path.join(basedir, "private", "root_dir.cap"), "w") open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl) + open(os.path.join(basedir, "stats_gatherer.furl"), "w").write(self.stats_gatherer_furl) # start client[0], wait for it's tub to be ready (at which point it # will have registered the helper furl). @@ -131,6 +148,10 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase): d.addCallback(_connected) return d + def _grab_stats(self, res): + d = self.stats_gatherer.poll() + return d + def bounce_client(self, num): c = self.clients[num] d = c.disownServiceParent() @@ -224,15 +245,16 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase): del test_connections def test_upload_and_download_random_key(self): + self.basedir = "system/SystemTest/test_upload_and_download_random_key" return self._test_upload_and_download(False) test_upload_and_download_random_key.timeout = 4800 def test_upload_and_download_content_hash_key(self): + self.basedir = "system/SystemTest/test_upload_and_download_CHK" return self._test_upload_and_download(True) test_upload_and_download_content_hash_key.timeout = 4800 def _test_upload_and_download(self, contenthashkey): - self.basedir = "system/SystemTest/test_upload_and_download" # we use 4000 bytes of data, which will result in about 400k written # to disk among all our simulated nodes DATA = "Some data to upload\n" * 200 @@ -836,6 +858,7 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase): # P/test_put/ (empty) d.addCallback(self._test_checker) d.addCallback(self._test_verifier) + d.addCallback(self._grab_stats) return d test_vdrive.timeout = 1100