DeepCheckResults, DeepCheckAndRepairResults
from allmydata.mutable.common import CorruptShareError
from allmydata.storage import storage_index_to_dir
-from allmydata.util import log, testutil, fileutil, pollmixin
+from allmydata.util import log, fileutil, pollmixin
from allmydata.stats import PickleStatsGatherer
from allmydata.key_generator import KeyGeneratorService
+import common_util as testutil
def flush_but_dont_ignore(res):
--- /dev/null
+import os, signal, time
+from random import randrange
+
+from twisted.internet import reactor, defer
+from twisted.python import failure
+
+def insecurerandstr(n):
+ return ''.join(map(chr, map(randrange, [0]*n, [256]*n)))
+
+def flip_bit(good, which):
+ # flip the low-order bit of good[which]
+ if which == -1:
+ pieces = good[:which], good[-1:], ""
+ else:
+ pieces = good[:which], good[which:which+1], good[which+1:]
+ return pieces[0] + chr(ord(pieces[1]) ^ 0x01) + pieces[2]
+
+def flip_one_bit(s, offset=0, size=None):
+ """ flip one random bit of the string s, in a byte greater than or equal to offset and less
+ than offset+size. """
+ if size is None:
+ size=len(s)-offset
+ i = randrange(offset, offset+size)
+ result = s[:i] + chr(ord(s[i])^(0x01<<randrange(0, 8))) + s[i+1:]
+ assert result != s, "Internal error -- flip_one_bit() produced the same string as its input: %s == %s" % (result, s)
+ return result
+
+class SignalMixin:
+ # This class is necessary for any code which wants to use Processes
+ # outside the usual reactor.run() environment. It is copied from
+ # Twisted's twisted.test.test_process
+ sigchldHandler = None
+
+ def setUpClass(self):
+ # make sure SIGCHLD handler is installed, as it should be on
+ # reactor.run(). problem is reactor may not have been run when this
+ # test runs.
+ if hasattr(reactor, "_handleSigchld") and hasattr(signal, "SIGCHLD"):
+ self.sigchldHandler = signal.signal(signal.SIGCHLD,
+ reactor._handleSigchld)
+
+ def tearDownClass(self):
+ if self.sigchldHandler:
+ signal.signal(signal.SIGCHLD, self.sigchldHandler)
+
+class StallMixin:
+ def stall(self, res=None, delay=1):
+ d = defer.Deferred()
+ reactor.callLater(delay, d.callback, res)
+ return d
+
+class ShouldFailMixin:
+
+ def shouldFail(self, expected_failure, which, substring,
+ callable, *args, **kwargs):
+ assert substring is None or isinstance(substring, str)
+ d = defer.maybeDeferred(callable, *args, **kwargs)
+ def done(res):
+ if isinstance(res, failure.Failure):
+ res.trap(expected_failure)
+ if substring:
+ self.failUnless(substring in str(res),
+ "%s: substring '%s' not in '%s'"
+ % (which, substring, str(res)))
+ # return the Failure for further analysis, but in a form that
+ # doesn't make the Deferred chain think that we failed.
+ return [res]
+ else:
+ self.fail("%s was supposed to raise %s, not get '%s'" %
+ (which, expected_failure, res))
+ d.addBoth(done)
+ return d
+
+
+class TestMixin(SignalMixin):
+ def setUp(self, repeatable=False):
+ """
+ @param repeatable: install the repeatable_randomness hacks to attempt
+ to without access to real randomness and real time.time from the
+ code under test
+ """
+ self.repeatable = repeatable
+ if self.repeatable:
+ import repeatable_random
+ repeatable_random.force_repeatability()
+ if hasattr(time, 'realtime'):
+ self.teststarttime = time.realtime()
+ else:
+ self.teststarttime = time.time()
+
+ def tearDown(self):
+ if self.repeatable:
+ import repeatable_random
+ repeatable_random.restore_non_repeatability()
+ self.clean_pending(required_to_quiesce=True)
+
+ def clean_pending(self, dummy=None, required_to_quiesce=True):
+ """
+ This handy method cleans all pending tasks from the reactor.
+
+ When writing a unit test, consider the following question:
+
+ Is the code that you are testing required to release control once it
+ has done its job, so that it is impossible for it to later come around
+ (with a delayed reactor task) and do anything further?
+
+ If so, then trial will usefully test that for you -- if the code under
+ test leaves any pending tasks on the reactor then trial will fail it.
+
+ On the other hand, some code is *not* required to release control -- some
+ code is allowed to continuously maintain control by rescheduling reactor
+ tasks in order to do ongoing work. Trial will incorrectly require that
+ code to clean up all its tasks from the reactor.
+
+ Most people think that such code should be amended to have an optional
+ "shutdown" operation that releases all control, but on the contrary it is
+ good design for some code to *not* have a shutdown operation, but instead
+ to have a "crash-only" design in which it recovers from crash on startup.
+
+ If the code under test is of the "long-running" kind, which is *not*
+ required to shutdown cleanly in order to pass tests, then you can simply
+ call testutil.clean_pending() at the end of the unit test, and trial will
+ be satisfied.
+ """
+ pending = reactor.getDelayedCalls()
+ active = bool(pending)
+ for p in pending:
+ if p.active():
+ p.cancel()
+ else:
+ print "WEIRNESS! pending timed call not active+!"
+ if required_to_quiesce and active:
+ self.fail("Reactor was still active when it was required to be quiescent.")
+
+try:
+ import win32file
+ import win32con
+ def make_readonly(path):
+ win32file.SetFileAttributes(path, win32con.FILE_ATTRIBUTE_READONLY)
+ def make_accessible(path):
+ win32file.SetFileAttributes(path, win32con.FILE_ATTRIBUTE_NORMAL)
+except ImportError:
+ import stat
+ def _make_readonly(path):
+ os.chmod(path, stat.S_IREAD)
+ os.chmod(os.path.dirname(path), stat.S_IREAD)
+ def _make_accessible(path):
+ os.chmod(os.path.dirname(path), stat.S_IWRITE | stat.S_IEXEC | stat.S_IREAD)
+ os.chmod(path, stat.S_IWRITE | stat.S_IEXEC | stat.S_IREAD)
+ make_readonly = _make_readonly
+ make_accessible = _make_accessible
import allmydata
from allmydata import client
from allmydata.introducer.client import IntroducerClient
-from allmydata.util import base32, testutil
+from allmydata.util import base32
from foolscap.eventual import flushEventualQueue
+import common_util as testutil
class FakeIntroducerClient(IntroducerClient):
def __init__(self):
INewDirectoryURI, IReadonlyNewDirectoryURI, IFileNode, \
ExistingChildError, NoSuchChildError, \
IDeepCheckResults, IDeepCheckAndRepairResults
-from allmydata.util import hashutil, testutil
+from allmydata.util import hashutil
from allmydata.monitor import Monitor
from allmydata.test.common import make_chk_file_uri, make_mutable_file_uri, \
FakeDirectoryNode, create_chk_filenode
from allmydata.checker_results import CheckerResults, CheckAndRepairResults
+import common_util as testutil
# to test dirnode.py, we want to construct a tree of real DirectoryNodes that
# contain pointers to fake files. We start with a fake MutableFileNode that
from foolscap import eventual
from allmydata import hashtree, uri
from allmydata.immutable import encode, upload, download
-from allmydata.util import hashutil, testutil
+from allmydata.util import hashutil
from allmydata.util.assertutil import _assert
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, NotEnoughSharesError
+import common_util as testutil
class LostPeerError(Exception):
pass
from allmydata.immutable import upload
from allmydata.test.common import SystemTestMixin, ShareManglingMixin
-from allmydata.util import testutil
from allmydata.monitor import Monitor
from allmydata.interfaces import IURI, NotEnoughSharesError
from twisted.internet import defer
from twisted.trial import unittest
import random, struct
+import common_util as testutil
TEST_DATA="\x02"*(upload.Uploader.URI_LIT_SIZE_THRESHOLD+1)
# test compatibility with old introducer .tac files
from allmydata.introducer import IntroducerNode
from allmydata.introducer import old
-from allmydata.util import testutil, idlib, pollmixin
+from allmydata.util import idlib, pollmixin
+import common_util as testutil
class FakeNode(Referenceable):
pass
-from allmydata.util import iputil, testutil
-
-from twisted.trial import unittest
import re
+from twisted.trial import unittest
+from allmydata.util import iputil
+import common_util as testutil
DOTTED_QUAD_RE=re.compile("^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$")
from twisted.python import failure
from allmydata import uri, storage
from allmydata.immutable import download
-from allmydata.util import base32, testutil, idlib
+from allmydata.util import base32, idlib
from allmydata.util.idlib import shortnodeid_b2a
from allmydata.util.hashutil import tagged_hash
from allmydata.util.fileutil import make_dirs
from allmydata.mutable.layout import unpack_header, unpack_share
from allmydata.mutable.repair import MustForceRepairError
+import common_util as testutil
+
# this "FastMutableFileNode" exists solely to speed up tests by using smaller
# public/private keys. Once we switch to fast DSA-based keys, we can get rid
# of this.
from foolscap.eventual import flushEventualQueue
from twisted.application import service
from allmydata.node import Node, formatTimeTahoeStyle
-from allmydata.util import testutil, fileutil
+from allmydata.util import fileutil
+import common_util as testutil
class LoggingMultiService(service.MultiService):
def log(self, msg, **kw):
from twisted.trial import unittest
from twisted.application import service
from allmydata.stats import CPUUsageMonitor
-from allmydata.util import testutil, pollmixin
+from allmydata.util import pollmixin
+import common_util as testutil
class FasterMonitor(CPUUsageMonitor):
POLL_INTERVAL = 0.1
import os
+from cStringIO import StringIO
from twisted.trial import unittest
from twisted.python.failure import Failure
from twisted.python import log
from twisted.internet import defer
-from cStringIO import StringIO
+from foolscap import eventual
from allmydata import uri
from allmydata.immutable import upload
from allmydata.interfaces import IFileURI, FileTooLargeError, NotEnoughSharesError
from allmydata.util.assertutil import precondition
from allmydata.util.deferredutil import DeferredListShouldSucceed
-from allmydata.util.testutil import ShouldFailMixin
-from foolscap import eventual
+from common_util import ShouldFailMixin
MiB = 1024*1024
from twisted.python import failure
from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil
-from allmydata.util import assertutil, fileutil, testutil, deferredutil
+from allmydata.util import assertutil, fileutil, deferredutil
from allmydata.util import limiter, time_format, pollmixin
class Base32(unittest.TestCase):
from allmydata import interfaces, provisioning, uri, webish
from allmydata.immutable import upload, download
from allmydata.web import status, common
-from allmydata.util import fileutil, testutil, base32
+from allmydata.util import fileutil, base32
from allmydata.test.common import FakeDirectoryNode, FakeCHKFileNode, \
FakeMutableFileNode, create_chk_filenode
from allmydata.interfaces import IURI, INewDirectoryURI, \
IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, IMutableFileNode
from allmydata.mutable import servermap, publish, retrieve
+import common_util as testutil
# create a fake uploader/downloader, and a couple of fake dirnodes, then
# create a webserver that works against them
+++ /dev/null
-import os, signal, time
-from random import randrange
-
-from twisted.internet import reactor, defer
-from twisted.python import failure
-
-def insecurerandstr(n):
- return ''.join(map(chr, map(randrange, [0]*n, [256]*n)))
-
-def flip_bit(good, which):
- # flip the low-order bit of good[which]
- if which == -1:
- pieces = good[:which], good[-1:], ""
- else:
- pieces = good[:which], good[which:which+1], good[which+1:]
- return pieces[0] + chr(ord(pieces[1]) ^ 0x01) + pieces[2]
-
-def flip_one_bit(s, offset=0, size=None):
- """ flip one random bit of the string s, in a byte greater than or equal to offset and less
- than offset+size. """
- if size is None:
- size=len(s)-offset
- i = randrange(offset, offset+size)
- result = s[:i] + chr(ord(s[i])^(0x01<<randrange(0, 8))) + s[i+1:]
- assert result != s, "Internal error -- flip_one_bit() produced the same string as its input: %s == %s" % (result, s)
- return result
-
-class SignalMixin:
- # This class is necessary for any code which wants to use Processes
- # outside the usual reactor.run() environment. It is copied from
- # Twisted's twisted.test.test_process
- sigchldHandler = None
-
- def setUpClass(self):
- # make sure SIGCHLD handler is installed, as it should be on
- # reactor.run(). problem is reactor may not have been run when this
- # test runs.
- if hasattr(reactor, "_handleSigchld") and hasattr(signal, "SIGCHLD"):
- self.sigchldHandler = signal.signal(signal.SIGCHLD,
- reactor._handleSigchld)
-
- def tearDownClass(self):
- if self.sigchldHandler:
- signal.signal(signal.SIGCHLD, self.sigchldHandler)
-
-class StallMixin:
- def stall(self, res=None, delay=1):
- d = defer.Deferred()
- reactor.callLater(delay, d.callback, res)
- return d
-
-class ShouldFailMixin:
-
- def shouldFail(self, expected_failure, which, substring,
- callable, *args, **kwargs):
- assert substring is None or isinstance(substring, str)
- d = defer.maybeDeferred(callable, *args, **kwargs)
- def done(res):
- if isinstance(res, failure.Failure):
- res.trap(expected_failure)
- if substring:
- self.failUnless(substring in str(res),
- "%s: substring '%s' not in '%s'"
- % (which, substring, str(res)))
- # return the Failure for further analysis, but in a form that
- # doesn't make the Deferred chain think that we failed.
- return [res]
- else:
- self.fail("%s was supposed to raise %s, not get '%s'" %
- (which, expected_failure, res))
- d.addBoth(done)
- return d
-
-
-class TestMixin(SignalMixin):
- def setUp(self, repeatable=False):
- """
- @param repeatable: install the repeatable_randomness hacks to attempt
- to without access to real randomness and real time.time from the
- code under test
- """
- self.repeatable = repeatable
- if self.repeatable:
- import repeatable_random
- repeatable_random.force_repeatability()
- if hasattr(time, 'realtime'):
- self.teststarttime = time.realtime()
- else:
- self.teststarttime = time.time()
-
- def tearDown(self):
- if self.repeatable:
- import repeatable_random
- repeatable_random.restore_non_repeatability()
- self.clean_pending(required_to_quiesce=True)
-
- def clean_pending(self, dummy=None, required_to_quiesce=True):
- """
- This handy method cleans all pending tasks from the reactor.
-
- When writing a unit test, consider the following question:
-
- Is the code that you are testing required to release control once it
- has done its job, so that it is impossible for it to later come around
- (with a delayed reactor task) and do anything further?
-
- If so, then trial will usefully test that for you -- if the code under
- test leaves any pending tasks on the reactor then trial will fail it.
-
- On the other hand, some code is *not* required to release control -- some
- code is allowed to continuously maintain control by rescheduling reactor
- tasks in order to do ongoing work. Trial will incorrectly require that
- code to clean up all its tasks from the reactor.
-
- Most people think that such code should be amended to have an optional
- "shutdown" operation that releases all control, but on the contrary it is
- good design for some code to *not* have a shutdown operation, but instead
- to have a "crash-only" design in which it recovers from crash on startup.
-
- If the code under test is of the "long-running" kind, which is *not*
- required to shutdown cleanly in order to pass tests, then you can simply
- call testutil.clean_pending() at the end of the unit test, and trial will
- be satisfied.
- """
- pending = reactor.getDelayedCalls()
- active = bool(pending)
- for p in pending:
- if p.active():
- p.cancel()
- else:
- print "WEIRNESS! pending timed call not active+!"
- if required_to_quiesce and active:
- self.fail("Reactor was still active when it was required to be quiescent.")
-
-try:
- import win32file
- import win32con
- def make_readonly(path):
- win32file.SetFileAttributes(path, win32con.FILE_ATTRIBUTE_READONLY)
- def make_accessible(path):
- win32file.SetFileAttributes(path, win32con.FILE_ATTRIBUTE_NORMAL)
-except ImportError:
- import stat
- def _make_readonly(path):
- os.chmod(path, stat.S_IREAD)
- os.chmod(os.path.dirname(path), stat.S_IREAD)
- def _make_accessible(path):
- os.chmod(os.path.dirname(path), stat.S_IWRITE | stat.S_IEXEC | stat.S_IREAD)
- os.chmod(path, stat.S_IWRITE | stat.S_IEXEC | stat.S_IREAD)
- make_readonly = _make_readonly
- make_accessible = _make_accessible