from allmydata.offloaded import Helper
from allmydata.control import ControlServer
from allmydata.introducer.client import IntroducerClient
-from allmydata.util import hashutil, base32, testutil, fileutil
+from allmydata.util import hashutil, base32, pollmixin, fileutil
from allmydata.uri import LiteralFileURI
from allmydata.dirnode import NewDirectoryNode
from allmydata.mutable.node import MutableFileNode, MutableWatcher
def _make_secret():
return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
-class Client(node.Node, testutil.PollMixin):
+class Client(node.Node, pollmixin.PollMixin):
implements(IStatsProducer)
PORTNUMFILE = "client.port"
from allmydata import client, introducer
from allmydata.immutable import upload
from allmydata.scripts import create_node
-from allmydata.util import testutil, fileutil
+from allmydata.util import fileutil, pollmixin
import foolscap
from foolscap import eventual
from twisted.python import log
reactor.connectTCP(host, port, factory)
return factory.deferred
-class SystemFramework(testutil.PollMixin):
+class SystemFramework(pollmixin.PollMixin):
numnodes = 5
def __init__(self, basedir, mode):
DeepCheckResults, DeepCheckAndRepairResults
from allmydata.mutable.common import CorruptShareError
from allmydata.storage import storage_index_to_dir
-from allmydata.util import log, testutil, fileutil
+from allmydata.util import log, testutil, fileutil, pollmixin
from allmydata.stats import PickleStatsGatherer
from allmydata.key_generator import KeyGeneratorService
return log.msg(*args, **kwargs)
-class SystemTestMixin(testutil.PollMixin, testutil.StallMixin):
+class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
def setUp(self):
self.sparent = service.MultiService()
# test compatibility with old introducer .tac files
from allmydata.introducer import IntroducerNode
from allmydata.introducer import old
-from allmydata.util import testutil, idlib
+from allmydata.util import testutil, idlib, pollmixin
class FakeNode(Referenceable):
pass
d.addCallback(flushEventualQueue)
return d
-class Introducer(ServiceMixin, unittest.TestCase, testutil.PollMixin):
+class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin):
def test_create(self):
ic = IntroducerClient(None, "introducer.furl", "my_nickname",
self.failUnlessEqual(len(i.get_announcements()), 2)
self.failUnlessEqual(len(i.get_subscribers()), 0)
-class SystemTestMixin(ServiceMixin, testutil.PollMixin):
+class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
def setUp(self):
ServiceMixin.setUp(self)
from foolscap import Tub, eventual
from allmydata import key_generator
-from allmydata.util import testutil
+from allmydata.util import pollmixin
from pycryptopp.publickey import rsa
def flush_but_dont_ignore(res):
d.addCallback(_done)
return d
-class KeyGenService(unittest.TestCase, testutil.PollMixin):
+class KeyGenService(unittest.TestCase, pollmixin.PollMixin):
def setUp(self):
self.parent = service.MultiService()
self.parent.startService()
from twisted.internet import defer
import os.path, re
from allmydata.scripts import runner
-from allmydata.util import fileutil, testutil
+from allmydata.util import fileutil, pollmixin
class CreateNode(unittest.TestCase):
def workdir(self, name):
[],
run_by_human=False)
-class RunNode(unittest.TestCase, testutil.PollMixin):
+class RunNode(unittest.TestCase, pollmixin.PollMixin):
def workdir(self, name):
basedir = os.path.join("test_runner", "RunNode", name)
fileutil.make_dirs(basedir)
from twisted.trial import unittest
from twisted.application import service
from allmydata.stats import CPUUsageMonitor
-from allmydata.util import testutil
+from allmydata.util import testutil, pollmixin
class FasterMonitor(CPUUsageMonitor):
POLL_INTERVAL = 0.1
-class CPUUsage(unittest.TestCase, testutil.PollMixin, testutil.StallMixin):
+class CPUUsage(unittest.TestCase, pollmixin.PollMixin, testutil.StallMixin):
def setUp(self):
self.s = service.MultiService()
self.s.startService()
from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil
from allmydata.util import assertutil, fileutil, testutil, deferredutil
-from allmydata.util import limiter, time_format
+from allmydata.util import limiter, time_format, pollmixin
class Base32(unittest.TestCase):
def test_b2a_matches_Pythons(self):
class PollMixinTests(unittest.TestCase):
def setUp(self):
- self.pm = testutil.PollMixin()
+ self.pm = pollmixin.PollMixin()
def test_PollMixin_True(self):
d = self.pm.poll(check_f=lambda : True,
def _suc(res):
self.fail("poll should have failed, not returned %s" % (res,))
def _err(f):
- f.trap(testutil.TimeoutError)
+ f.trap(pollmixin.TimeoutError)
return None # success
d.addCallbacks(_suc, _err)
return d
--- /dev/null
+
+import time
+from twisted.internet import task
+
+class TimeoutError(Exception):
+ pass
+
+class PollComplete(Exception):
+ pass
+
+class PollMixin:
+
+ def poll(self, check_f, pollinterval=0.01, timeout=100):
+ # Return a Deferred, then call check_f periodically until it returns
+ # True, at which point the Deferred will fire.. If check_f raises an
+ # exception, the Deferred will errback. If the check_f does not
+ # indicate success within timeout= seconds, the Deferred will
+ # errback. If timeout=None, no timeout will be enforced, and the loop
+ # will poll forever (or really until Trial times out).
+ cutoff = None
+ if timeout is not None:
+ cutoff = time.time() + timeout
+ lc = task.LoopingCall(self._poll, check_f, cutoff)
+ d = lc.start(pollinterval)
+ def _convert_done(f):
+ f.trap(PollComplete)
+ return None
+ d.addErrback(_convert_done)
+ return d
+
+ def _poll(self, check_f, cutoff):
+ if cutoff is not None and time.time() > cutoff:
+ raise TimeoutError("PollMixin never saw %s return True" % check_f)
+ if check_f():
+ raise PollComplete()
+
import os, signal, time
from random import randrange
-from twisted.internet import reactor, defer, task
+from twisted.internet import reactor, defer
from twisted.python import failure
def insecurerandstr(n):
if self.sigchldHandler:
signal.signal(signal.SIGCHLD, self.sigchldHandler)
-class TimeoutError(Exception):
- pass
-
-class PollComplete(Exception):
- pass
-
-class PollMixin:
-
- def poll(self, check_f, pollinterval=0.01, timeout=100):
- # Return a Deferred, then call check_f periodically until it returns
- # True, at which point the Deferred will fire.. If check_f raises an
- # exception, the Deferred will errback. If the check_f does not
- # indicate success within timeout= seconds, the Deferred will
- # errback. If timeout=None, no timeout will be enforced, and the loop
- # will poll forever (or really until Trial times out).
- cutoff = None
- if timeout is not None:
- cutoff = time.time() + timeout
- lc = task.LoopingCall(self._poll, check_f, cutoff)
- d = lc.start(pollinterval)
- def _convert_done(f):
- f.trap(PollComplete)
- return None
- d.addErrback(_convert_done)
- return d
-
- def _poll(self, check_f, cutoff):
- if cutoff is not None and time.time() > cutoff:
- raise TimeoutError("PollMixin never saw %s return True" % check_f)
- if check_f():
- raise PollComplete()
-
class StallMixin:
def stall(self, res=None, delay=1):
d = defer.Deferred()