from zope.interface import implements
import foolscap
from foolscap.logging.gatherer import get_local_ip_for
+from twisted.internet.error import ConnectionDone, ConnectionLost
+from foolscap import DeadReferenceError
from allmydata.util import log
from allmydata.interfaces import RIStatsProvider, RIStatsGatherer, IStatsProducer
self.started = False
self.last = None
self.stats = deque()
+ self.timer = None
def startService(self):
if not self.started:
self.started = True
- reactor.callLater(self.loop_interval, self.loop)
+ self.timer = reactor.callLater(self.loop_interval, self.loop)
service.MultiService.startService(self)
def stopService(self):
self.started = False
+ if self.timer:
+ self.timer.cancel()
+ self.timer = None
+ return service.MultiService.stopService(self)
def loop(self):
+ self.timer = None
if not self.started:
return
now = time.time()
self.stats.popleft()
self.last = now
- reactor.callLater(self.loop_interval, self.loop)
+ self.timer = reactor.callLater(self.loop_interval, self.loop)
def get_stats(self):
if self.stats:
return { 'counters': self.counters, 'stats': stats }
def _connected(self, gatherer, nickname):
- gatherer.callRemote('provide', self, nickname or '')
+ gatherer.callRemoteOnly('provide', self, nickname or '')
class StatsGatherer(foolscap.Referenceable, service.MultiService):
implements(RIStatsGatherer)
poll_interval = 60
- def __init__(self, tub):
+ def __init__(self, tub, basedir):
service.MultiService.__init__(self)
self.tub = tub
+ self.basedir = basedir
self.clients = {}
self.nicknames = {}
def startService(self):
+ # the Tub must have a location set on it by now
+ service.MultiService.startService(self)
self.timer = TimerService(self.poll_interval, self.poll)
self.timer.setServiceParent(self)
- service.MultiService.startService(self)
+ self.registerGatherer()
def get_furl(self):
- return self.tub.registerReference(self, furlFile='stats_gatherer.furl')
+ return self.my_furl
+
+ def registerGatherer(self):
+ furl_file = os.path.join(self.basedir, "stats_gatherer.furl")
+ self.my_furl = self.tub.registerReference(self, furlFile=furl_file)
def get_tubid(self, rref):
return foolscap.SturdyRef(rref.tracker.getURL()).getTubRef().getTubID()
return
self.clients[tubid] = provider
self.nicknames[tubid] = nickname
- provider.notifyOnDisconnect(self.lost_client, tubid)
-
- def lost_client(self, tubid):
- del self.clients[tubid]
- del self.nicknames[tubid]
def poll(self):
for tubid,client in self.clients.items():
nickname = self.nicknames.get(tubid)
d = client.callRemote('get_stats')
- d.addCallback(self.got_stats, tubid, nickname)
+ d.addCallbacks(self.got_stats, self.lost_client,
+ callbackArgs=(tubid, nickname),
+ errbackArgs=(tubid,))
+ d.addErrback(self.log_client_error, tubid)
+
+ def lost_client(self, f, tubid):
+ # this is called lazily, when a get_stats request fails
+ del self.clients[tubid]
+ del self.nicknames[tubid]
+ f.trap(DeadReferenceError, ConnectionDone, ConnectionLost)
+
+ def log_client_error(self, f, tubid):
+ log.msg("StatsGatherer: error in get_stats(), peerid=%s" % tubid,
+ level=log.UNUSUAL, failure=f)
def got_stats(self, stats, tubid, nickname):
raise NotImplementedError()
class StdOutStatsGatherer(StatsGatherer):
+ verbose = True
def remote_provide(self, provider, nickname):
tubid = self.get_tubid(provider)
- print 'connect "%s" [%s]' % (nickname, tubid)
+ if self.verbose:
+ print 'connect "%s" [%s]' % (nickname, tubid)
+ provider.notifyOnDisconnect(self.announce_lost_client, tubid)
StatsGatherer.remote_provide(self, provider, nickname)
- def lost_client(self, tubid):
+ def announce_lost_client(self, tubid):
print 'disconnect "%s" [%s]:' % (self.nicknames[tubid], tubid)
- StatsGatherer.lost_client(self, tubid)
def got_stats(self, stats, tubid, nickname):
print '"%s" [%s]:' % (nickname, tubid)
pprint.pprint(stats)
-class PickleStatsGatherer(StdOutStatsGatherer): # for connect/disconnect notifications;
-#class PickleStatsGatherer(StatsGatherer):
- def __init__(self, tub, picklefile):
- StatsGatherer.__init__(self, tub)
- self.picklefile = picklefile
+class PickleStatsGatherer(StdOutStatsGatherer):
+ # inherit from StdOutStatsGatherer for connect/disconnect notifications
+
+ def __init__(self, tub, basedir=".", verbose=True):
+ self.verbose = verbose
+ StatsGatherer.__init__(self, tub, basedir)
+ self.picklefile = os.path.join(basedir, "stats.pickle")
- if os.path.exists(picklefile):
- f = open(picklefile, 'rb')
+ if os.path.exists(self.picklefile):
+ f = open(self.picklefile, 'rb')
self.gathered_stats = pickle.load(f)
f.close()
else:
self._tub.setLocation(location)
def _tub_ready(self, tub):
- sg = PickleStatsGatherer(tub, 'stats.pickle')
+ sg = PickleStatsGatherer(tub, ".")
sg.setServiceParent(tub)
sg.verbose = True
print '\nStatsGatherer: %s\n' % (sg.get_furl(),)
from allmydata.scripts import runner
from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
from allmydata.mutable import NotMutableError
+from allmydata.stats import PickleStatsGatherer
from foolscap.eventual import flushEventualQueue
-from foolscap import DeadReferenceError
+from foolscap import DeadReferenceError, Tub
from twisted.python.failure import Failure
from twisted.web.client import getPage
from twisted.web.error import Error
iv = IntroducerNode(basedir=iv_dir)
self.introducer = self.add_service(iv)
d = self.introducer.when_tub_ready()
+ d.addCallback(self._set_up_stats_gatherer)
d.addCallback(self._set_up_nodes_2)
+ d.addCallback(self._grab_stats)
return d
+ def _set_up_stats_gatherer(self, res):
+ statsdir = self.getdir("stats_gatherer")
+ fileutil.make_dirs(statsdir)
+ t = Tub()
+ self.add_service(t)
+ l = t.listenOn("tcp:0")
+ p = l.getPortnum()
+ t.setLocation("localhost:%d" % p)
+
+ self.stats_gatherer = PickleStatsGatherer(t, statsdir, False)
+ self.add_service(self.stats_gatherer)
+ self.stats_gatherer_furl = self.stats_gatherer.get_furl()
+
def _set_up_nodes_2(self, res):
q = self.introducer
self.introducer_furl = q.introducer_url
fileutil.make_dirs(os.path.join(basedir, "private"))
open(os.path.join(basedir, "private", "root_dir.cap"), "w")
open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
+ open(os.path.join(basedir, "stats_gatherer.furl"), "w").write(self.stats_gatherer_furl)
# start client[0], wait for it's tub to be ready (at which point it
# will have registered the helper furl).
d.addCallback(_connected)
return d
+ def _grab_stats(self, res):
+ d = self.stats_gatherer.poll()
+ return d
+
def bounce_client(self, num):
c = self.clients[num]
d = c.disownServiceParent()
del test_connections
def test_upload_and_download_random_key(self):
+ self.basedir = "system/SystemTest/test_upload_and_download_random_key"
return self._test_upload_and_download(False)
test_upload_and_download_random_key.timeout = 4800
def test_upload_and_download_content_hash_key(self):
+ self.basedir = "system/SystemTest/test_upload_and_download_CHK"
return self._test_upload_and_download(True)
test_upload_and_download_content_hash_key.timeout = 4800
def _test_upload_and_download(self, contenthashkey):
- self.basedir = "system/SystemTest/test_upload_and_download"
# we use 4000 bytes of data, which will result in about 400k written
# to disk among all our simulated nodes
DATA = "Some data to upload\n" * 200
# P/test_put/ (empty)
d.addCallback(self._test_checker)
d.addCallback(self._test_verifier)
+ d.addCallback(self._grab_stats)
return d
test_vdrive.timeout = 1100