# pycrypto 2.4 doesn't work due to <https://bugs.launchpad.net/pycrypto/+bug/881130>
"pycrypto >= 2.1.0, != 2.2, != 2.4",
- # <http://www.voidspace.org.uk/python/mock/>, 0.8.0 provides "call"
- # mock 1.1.x seems to cause problems on several buildslaves.
- "mock >= 0.8.0, <= 1.0.1",
-
# pycryptopp-0.6.0 includes ed25519
"pycryptopp >= 0.6.0",
('simplejson', 'simplejson'),
('pycrypto', 'Crypto'),
('pyasn1', 'pyasn1'),
- ('mock', 'mock'),
('service-identity', 'service_identity'),
('characteristic', 'characteristic'),
('pyasn1-modules', 'pyasn1_modules'),
import os.path
-from twisted.trial import unittest
from cStringIO import StringIO
import urllib, sys
-from mock import Mock, call
+from twisted.trial import unittest
+from twisted.python.monkey import MonkeyPatcher
import allmydata
from allmydata.util import fileutil, hashutil, base32, keyutil
+from allmydata.util.namespace import Namespace
from allmydata import uri
from allmydata.immutable import upload
from allmydata.dirnode import normalize
def test_exception_catcher(self):
self.basedir = "cli/exception_catcher"
- runner_mock = Mock()
- sys_exit_mock = Mock()
stderr = StringIO()
- self.patch(sys, "argv", ["tahoe"])
- self.patch(runner, "runner", runner_mock)
- self.patch(sys, "exit", sys_exit_mock)
- self.patch(sys, "stderr", stderr)
exc = Exception("canary")
+ ns = Namespace()
+ ns.runner_called = False
def call_runner(args, install_node_control=True):
+ ns.runner_called = True
+ self.failUnlessEqual(install_node_control, True)
raise exc
- runner_mock.side_effect = call_runner
- runner.run()
- self.failUnlessEqual(runner_mock.call_args_list, [call([], install_node_control=True)])
- self.failUnlessEqual(sys_exit_mock.call_args_list, [call(1)])
+ ns.sys_exit_called = False
+ def call_sys_exit(exitcode):
+ ns.sys_exit_called = True
+ self.failUnlessEqual(exitcode, 1)
+
+ patcher = MonkeyPatcher((runner, 'runner', call_runner),
+ (sys, 'argv', ["tahoe"]),
+ (sys, 'exit', call_sys_exit),
+ (sys, 'stderr', stderr))
+ patcher.runWithPatches(runner.run)
+
+ self.failUnless(ns.runner_called)
+ self.failUnless(ns.sys_exit_called)
self.failUnlessIn(str(exc), stderr.getvalue())
+
import os.path
-from twisted.trial import unittest
from cStringIO import StringIO
import re
-from mock import patch
+from twisted.trial import unittest
+from twisted.python.monkey import MonkeyPatcher
+import __builtin__
from allmydata.util import fileutil
from allmydata.util.fileutil import abspath_expanduser_unicode
from allmydata.util.encodingutil import get_io_encoding, unicode_to_argv
+from allmydata.util.namespace import Namespace
from allmydata.scripts import cli, backupdb
from .common_util import StallMixin
from .no_network import GridTestMixin
self._check_filtering(filtered, root_listdir, (u'lib.a', u'_darcs', u'subdir'),
(nice_doc,))
- @patch('__builtin__.file')
- def test_exclude_from_tilde_expansion(self, mock):
+ def test_exclude_from_tilde_expansion(self):
basedir = "cli/Backup/exclude_from_tilde_expansion"
fileutil.make_dirs(basedir)
nodeurl_path = os.path.join(basedir, 'node.url')
fileutil.write(nodeurl_path, 'http://example.net:2357/')
- def parse(args): return parse_options(basedir, "backup", args)
# ensure that tilde expansion is performed on exclude-from argument
exclude_file = u'~/.tahoe/excludes.dummy'
- mock.return_value = StringIO()
- parse(['--exclude-from', unicode_to_argv(exclude_file), 'from', 'to'])
- self.failUnlessIn(((abspath_expanduser_unicode(exclude_file),), {}), mock.call_args_list)
+ ns = Namespace()
+ ns.called = False
+ def call_file(name, *args):
+ ns.called = True
+ self.failUnlessEqual(name, abspath_expanduser_unicode(exclude_file))
+ return StringIO()
+
+ patcher = MonkeyPatcher((__builtin__, 'file', call_file))
+ patcher.runWithPatches(parse_options, basedir, "backup", ['--exclude-from', unicode_to_argv(exclude_file), 'from', 'to'])
+ self.failUnless(ns.called)
def test_ignore_symlinks(self):
if not hasattr(os, 'symlink'):
import os, sys
+import twisted
from twisted.trial import unittest
from twisted.application import service
import allmydata
+import allmydata.frontends.drop_upload
+import allmydata.util.log
+
from allmydata.node import Node, OldConfigError, OldConfigOptionError, MissingConfigEntry, UnescapedHashError
from allmydata.frontends.auth import NeedRootcapLookupScheme
from allmydata import client
from foolscap.api import flushEventualQueue
import allmydata.test.common_util as testutil
-import mock
BASECONFIG = ("[client]\n"
"introducer.furl = \n"
client.Client(basedir)
- @mock.patch('twisted.python.log.msg')
- def test_error_on_old_config_files(self, mock_log_msg):
+ def test_error_on_old_config_files(self):
basedir = "test_client.Basic.test_error_on_old_config_files"
os.mkdir(basedir)
fileutil.write(os.path.join(basedir, "tahoe.cfg"),
fileutil.write(os.path.join(basedir, "readonly_storage"), "")
fileutil.write(os.path.join(basedir, "debug_discard_storage"), "")
+ logged_messages = []
+ self.patch(twisted.python.log, 'msg', logged_messages.append)
+
e = self.failUnlessRaises(OldConfigError, client.Client, basedir)
abs_basedir = fileutil.abspath_expanduser_unicode(unicode(basedir)).encode(sys.getfilesystemencoding())
self.failUnlessIn(os.path.join(abs_basedir, "introducer.furl"), e.args[0])
for oldfile in ['introducer.furl', 'no_storage', 'readonly_storage',
'debug_discard_storage']:
- logged = [ m for m in mock_log_msg.call_args_list if
- ("Found pre-Tahoe-LAFS-v1.3 configuration file" in str(m[0][0]) and oldfile in str(m[0][0])) ]
- self.failUnless(logged, (oldfile, mock_log_msg.call_args_list))
+ logged = [ m for m in logged_messages if
+ ("Found pre-Tahoe-LAFS-v1.3 configuration file" in str(m) and oldfile in str(m)) ]
+ self.failUnless(logged, (oldfile, logged_messages))
for oldfile in [
'nickname', 'webport', 'keepalive_timeout', 'log_gatherer.furl',
'disconnect_timeout', 'advertised_ip_addresses', 'helper.furl',
'key_generator.furl', 'stats_gatherer.furl', 'sizelimit',
'run_helper']:
- logged = [ m for m in mock_log_msg.call_args_list if
- ("Found pre-Tahoe-LAFS-v1.3 configuration file" in str(m[0][0]) and oldfile in str(m[0][0])) ]
- self.failIf(logged, oldfile)
+ logged = [ m for m in logged_messages if
+ ("Found pre-Tahoe-LAFS-v1.3 configuration file" in str(m) and oldfile in str(m)) ]
+ self.failIf(logged, (oldfile, logged_messages))
def test_secrets(self):
basedir = "test_client.Basic.test_secrets"
_check("helper.furl = None", None)
_check("helper.furl = pb://blah\n", "pb://blah")
- @mock.patch('allmydata.util.log.msg')
- @mock.patch('allmydata.frontends.drop_upload.DropUploader')
- def test_create_drop_uploader(self, mock_drop_uploader, mock_log_msg):
+ def test_create_drop_uploader(self):
class MockDropUploader(service.MultiService):
name = 'drop-upload'
self.local_dir_utf8 = local_dir_utf8
self.inotify = inotify
- mock_drop_uploader.side_effect = MockDropUploader
+ self.patch(allmydata.frontends.drop_upload, 'DropUploader', MockDropUploader)
upload_dircap = "URI:DIR2:blah"
local_dir_utf8 = u"loc\u0101l_dir".encode('utf-8')
class Boom(Exception):
pass
- mock_drop_uploader.side_effect = Boom()
+ def BoomDropUploader(client, upload_dircap, local_dir_utf8, inotify=None):
+ raise Boom()
+
+ logged_messages = []
+ def mock_log(*args, **kwargs):
+ logged_messages.append("%r %r" % (args, kwargs))
+ self.patch(allmydata.util.log, 'msg', mock_log)
+ self.patch(allmydata.frontends.drop_upload, 'DropUploader', BoomDropUploader)
basedir2 = "test_client.Basic.test_create_drop_uploader2"
os.mkdir(basedir2)
fileutil.write(os.path.join(basedir2, "private", "drop_upload_dircap"), "URI:DIR2:blah")
c2 = client.Client(basedir2)
self.failUnlessRaises(KeyError, c2.getServiceNamed, 'drop-upload')
- self.failUnless([True for arg in mock_log_msg.call_args_list if "Boom" in repr(arg)],
- mock_log_msg.call_args_list)
+ self.failUnless([True for arg in logged_messages if "Boom" in arg],
+ logged_messages)
def flush_but_dont_ignore(res):
shutil.rmtree(tmpdir)
sys.exit(0)
-from twisted.trial import unittest
-from mock import patch
+
import os, sys, locale
+from twisted.trial import unittest
+
from allmydata.test.common_util import ReallyEqualMixin
from allmydata.util import encodingutil, fileutil
from allmydata.util.encodingutil import argv_to_unicode, unicode_to_url, \
from twisted.python import usage
+
+class MockStdout(object):
+ pass
+
class EncodingUtilErrors(ReallyEqualMixin, unittest.TestCase):
+ def test_get_io_encoding(self):
+ mock_stdout = MockStdout()
+ self.patch(sys, 'stdout', mock_stdout)
- @patch('sys.stdout')
- def test_get_io_encoding(self, mock_stdout):
mock_stdout.encoding = 'UTF-8'
_reload()
self.failUnlessReallyEqual(get_io_encoding(), 'utf-8')
else:
self.failUnlessRaises(AssertionError, _reload)
- @patch('locale.getpreferredencoding')
- def test_get_io_encoding_not_from_stdout(self, mock_locale_getpreferredencoding):
- locale # hush pyflakes
- mock_locale_getpreferredencoding.return_value = 'koi8-r'
+ def test_get_io_encoding_not_from_stdout(self):
+ preferredencoding = 'koi8-r'
+ def call_locale_getpreferredencoding():
+ return preferredencoding
+ self.patch(locale, 'getpreferredencoding', call_locale_getpreferredencoding)
+ mock_stdout = MockStdout()
+ self.patch(sys, 'stdout', mock_stdout)
- class DummyStdout:
- pass
- old_stdout = sys.stdout
- sys.stdout = DummyStdout()
- try:
- expected = sys.platform == "win32" and 'utf-8' or 'koi8-r'
- _reload()
- self.failUnlessReallyEqual(get_io_encoding(), expected)
+ expected = sys.platform == "win32" and 'utf-8' or 'koi8-r'
+ _reload()
+ self.failUnlessReallyEqual(get_io_encoding(), expected)
- sys.stdout.encoding = None
- _reload()
- self.failUnlessReallyEqual(get_io_encoding(), expected)
+ mock_stdout.encoding = None
+ _reload()
+ self.failUnlessReallyEqual(get_io_encoding(), expected)
- mock_locale_getpreferredencoding.return_value = None
- _reload()
- self.failUnlessReallyEqual(get_io_encoding(), 'utf-8')
- finally:
- sys.stdout = old_stdout
+ preferredencoding = None
+ _reload()
+ self.failUnlessReallyEqual(get_io_encoding(), 'utf-8')
def test_argv_to_unicode(self):
encodingutil.io_encoding = 'utf-8'
encodingutil.io_encoding = 'koi8-r'
self.failUnlessRaises(UnicodeEncodeError, unicode_to_output, lumiere_nfc)
- @patch('os.listdir')
- def test_no_unicode_normalization(self, mock):
+ def test_no_unicode_normalization(self):
# Pretend to run on a Unicode platform.
- # We normalized to NFC in 1.7beta, but we now don't.
- orig_platform = sys.platform
- try:
- sys.platform = 'darwin'
- mock.return_value = [Artonwall_nfd]
- _reload()
- self.failUnlessReallyEqual(listdir_unicode(u'/dummy'), [Artonwall_nfd])
- finally:
- sys.platform = orig_platform
+ # listdir_unicode normalized to NFC in 1.7beta, but now doesn't.
+
+ def call_os_listdir(path):
+ return [Artonwall_nfd]
+ self.patch(os, 'listdir', call_os_listdir)
+ self.patch(sys, 'platform', 'darwin')
+
+ _reload()
+ self.failUnlessReallyEqual(listdir_unicode(u'/dummy'), [Artonwall_nfd])
+
# The following tests apply only to platforms that don't store filenames as
# Unicode entities on the filesystem.
sys.platform = self.original_platform
_reload()
- @patch('sys.getfilesystemencoding')
- @patch('os.listdir')
- def test_listdir_unicode(self, mock_listdir, mock_getfilesystemencoding):
+ def test_listdir_unicode(self):
# What happens if latin1-encoded filenames are encountered on an UTF-8
# filesystem?
- mock_listdir.return_value = [
- lumiere_nfc.encode('utf-8'),
- lumiere_nfc.encode('latin1')]
+ def call_os_listdir(path):
+ return [
+ lumiere_nfc.encode('utf-8'),
+ lumiere_nfc.encode('latin1')
+ ]
+ self.patch(os, 'listdir', call_os_listdir)
+
+ sys_filesystemencoding = 'utf-8'
+ def call_sys_getfilesystemencoding():
+ return sys_filesystemencoding
+ self.patch(sys, 'getfilesystemencoding', call_sys_getfilesystemencoding)
- mock_getfilesystemencoding.return_value = 'utf-8'
_reload()
self.failUnlessRaises(FilenameEncodingError,
listdir_unicode,
# We're trying to list a directory whose name cannot be represented in
# the filesystem encoding. This should fail.
- mock_getfilesystemencoding.return_value = 'ascii'
+ sys_filesystemencoding = 'ascii'
_reload()
self.failUnlessRaises(FilenameEncodingError,
listdir_unicode,
sys.platform = self.original_platform
_reload()
- @patch('sys.stdout')
- def test_argv_to_unicode(self, mock):
+ def test_argv_to_unicode(self):
if 'argv' not in dir(self):
return
- mock.encoding = self.io_encoding
+ mock_stdout = MockStdout()
+ mock_stdout.encoding = self.io_encoding
+ self.patch(sys, 'stdout', mock_stdout)
+
argu = lumiere_nfc
argv = self.argv
_reload()
def test_unicode_to_url(self):
self.failUnless(unicode_to_url(lumiere_nfc), "lumi\xc3\xa8re")
- @patch('sys.stdout')
- def test_unicode_to_output(self, mock):
+ def test_unicode_to_output(self):
if 'argv' not in dir(self):
return
- mock.encoding = self.io_encoding
+ mock_stdout = MockStdout()
+ mock_stdout.encoding = self.io_encoding
+ self.patch(sys, 'stdout', mock_stdout)
+
_reload()
self.failUnlessReallyEqual(unicode_to_output(lumiere_nfc), self.argv)
_reload()
self.failUnlessReallyEqual(unicode_platform(), matrix[self.platform])
- @patch('sys.getfilesystemencoding')
- @patch('os.listdir')
- def test_listdir_unicode(self, mock_listdir, mock_getfilesystemencoding):
+ def test_listdir_unicode(self):
if 'dirlist' not in dir(self):
return
"that we are testing for the benefit of a different platform."
% (self.filesystem_encoding,))
- mock_listdir.return_value = self.dirlist
- mock_getfilesystemencoding.return_value = self.filesystem_encoding
+ def call_os_listdir(path):
+ return self.dirlist
+ self.patch(os, 'listdir', call_os_listdir)
+
+ def call_sys_getfilesystemencoding():
+ return self.filesystem_encoding
+ self.patch(sys, 'getfilesystemencoding', call_sys_getfilesystemencoding)
_reload()
filenames = listdir_unicode(u'/dummy')
+
import random
from twisted.trial import unittest
from twisted.internet import defer
-import mock
from foolscap.api import eventually
from allmydata.test import common
from allmydata.immutable.upload import Data
from allmydata.immutable.downloader import finder
+
+class MockShareHashTree(object):
+ def needed_hashes(self):
+ return False
+
class MockNode(object):
def __init__(self, check_reneging, check_fetch_failed):
self.got = 0
self.check_fetch_failed = check_fetch_failed
self._si_prefix='aa'
self.have_UEB = True
- self.share_hash_tree = mock.Mock()
- self.share_hash_tree.needed_hashes.return_value = False
+ self.share_hash_tree = MockShareHashTree()
self.on_want_more_shares = None
def when_finished(self):
rcap = uri.CHKFileURI('a'*32, 'a'*32, 3, 99, 100)
vcap = rcap.get_verify_cap()
+ class MockBuckets(object):
+ pass
+
class MockServer(object):
def __init__(self, buckets):
self.version = {
self.s.hungry()
eventually(_give_buckets_and_hunger_again)
return d
+
class MockIServer(object):
def __init__(self, serverid, rref):
self.serverid = serverid
def get_version(self):
return self.rref.version
- mockserver1 = MockServer({1: mock.Mock(), 2: mock.Mock()})
+ class MockStorageBroker(object):
+ def __init__(self, servers):
+ self.servers = servers
+ def get_servers_for_psi(self, si):
+ return self.servers
+
+ class MockDownloadStatus(object):
+ def add_dyhb_request(self, server, when):
+ return MockDYHBEvent()
+
+ class MockDYHBEvent(object):
+ def finished(self, shnums, when):
+ pass
+
+ mockserver1 = MockServer({1: MockBuckets(), 2: MockBuckets()})
mockserver2 = MockServer({})
- mockserver3 = MockServer({3: mock.Mock()})
- mockstoragebroker = mock.Mock()
+ mockserver3 = MockServer({3: MockBuckets()})
servers = [ MockIServer("ms1", mockserver1),
MockIServer("ms2", mockserver2),
MockIServer("ms3", mockserver3), ]
- mockstoragebroker.get_servers_for_psi.return_value = servers
- mockdownloadstatus = mock.Mock()
+ mockstoragebroker = MockStorageBroker(servers)
+ mockdownloadstatus = MockDownloadStatus()
mocknode = MockNode(check_reneging=True, check_fetch_failed=True)
s = finder.ShareFinder(mockstoragebroker, vcap, mocknode, mockdownloadstatus)
from twisted.trial import unittest
+from twisted.python.monkey import MonkeyPatcher
import allmydata
-import mock
+import __builtin__
-real_import_func = __import__
class T(unittest.TestCase):
- @mock.patch('__builtin__.__import__')
- def test_report_import_error(self, mockimport):
+ def test_report_import_error(self):
+ real_import_func = __import__
def raiseIE_from_this_particular_func(name, *args):
if name == "foolscap":
marker = "wheeeyo"
else:
return real_import_func(name, *args)
- mockimport.side_effect = raiseIE_from_this_particular_func
+ # Let's run as little code as possible with __import__ patched.
+ patcher = MonkeyPatcher((__builtin__, '__import__', raiseIE_from_this_particular_func))
+ vers_and_locs = patcher.runWithPatches(allmydata.get_package_versions_and_locations)
- vers_and_locs = allmydata.get_package_versions_and_locations()
for (pkgname, stuff) in vers_and_locs:
if pkgname == 'foolscap':
self.failUnless('wheeeyo' in str(stuff[2]), stuff)
-import mock
from twisted.trial.unittest import TestCase
from allmydata.web.common import get_filenode_metadata, SDMF_VERSION, MDMF_VERSION
-class CommonFixture(object):
- def setUp(self):
- self.mockfilenode = mock.Mock()
+class MockFileNode(object):
+ def __init__(self, size, mutable_version=None):
+ self.size = size
+ self.mutable_version = mutable_version
+
+ def get_size(self):
+ return self.size
+
+ def is_mutable(self):
+ return self.mutable_version is not None
+ def get_version(self):
+ if self.mutable_version is None:
+ raise AttributeError()
+ return self.mutable_version
+
+
+class CommonFixture(object):
def test_size_is_0(self):
"""If get_size doesn't return None the returned metadata must contain "size"."""
- self.mockfilenode.get_size.return_value = 0
- metadata = get_filenode_metadata(self.mockfilenode)
- self.failUnlessIn('size', metadata)
+ mockfilenode = MockFileNode(0, mutable_version=self.mutable_version)
+ metadata = get_filenode_metadata(mockfilenode)
+ self.failUnlessEqual(metadata['size'], 0)
def test_size_is_1000(self):
"""1000 is sufficiently large to guarantee the cap is not a literal."""
- self.mockfilenode.get_size.return_value = 1000
- metadata = get_filenode_metadata(self.mockfilenode)
- self.failUnlessIn('size', metadata)
+ mockfilenode = MockFileNode(1000, mutable_version=self.mutable_version)
+ metadata = get_filenode_metadata(mockfilenode)
+ self.failUnlessEqual(metadata['size'], 1000)
def test_size_is_None(self):
"""If get_size returns None the returned metadata must not contain "size"."""
- self.mockfilenode.get_size.return_value = None
- metadata = get_filenode_metadata(self.mockfilenode)
+ mockfilenode = MockFileNode(None, mutable_version=self.mutable_version)
+ metadata = get_filenode_metadata(mockfilenode)
self.failIfIn('size', metadata)
class Test_GetFileNodeMetaData_Immutable(CommonFixture, TestCase):
def setUp(self):
- CommonFixture.setUp(self)
- self.mockfilenode.is_mutable.return_value = False
+ self.mutable_version = None
class Test_GetFileNodeMetaData_SDMF(CommonFixture, TestCase):
def setUp(self):
- CommonFixture.setUp(self)
- self.mockfilenode.is_mutable.return_value = True
- self.mockfilenode.get_version.return_value = SDMF_VERSION
+ self.mutable_version = SDMF_VERSION
class Test_GetFileNodeMetaData_MDMF(CommonFixture, TestCase):
def setUp(self):
- CommonFixture.setUp(self)
- self.mockfilenode.is_mutable.return_value = True
- self.mockfilenode.get_version.return_value = MDMF_VERSION
+ self.mutable_version = MDMF_VERSION
import os, stat, sys, time
+
from twisted.trial import unittest
from twisted.internet import defer
from twisted.python import log
-from mock import patch
-
from foolscap.api import flushEventualQueue
+import foolscap.logging.log
+
from twisted.application import service
from allmydata.node import Node, formatTimeTahoeStyle, MissingConfigEntry
from allmydata.util import fileutil, iputil
+from allmydata.util.namespace import Namespace
import allmydata.test.common_util as testutil
+
class LoggingMultiService(service.MultiService):
def log(self, msg, **kw):
pass
bits = stat.S_IMODE(st[stat.ST_MODE])
self.failUnless(bits & 0001 == 0, bits)
- @patch("foolscap.logging.log.setLogDir")
- def test_logdir_is_str(self, mock_setLogDir):
+ def test_logdir_is_str(self):
basedir = "test_node/test_logdir_is_str"
fileutil.make_dirs(basedir)
+ ns = Namespace()
+ ns.called = False
def call_setLogDir(logdir):
+ ns.called = True
self.failUnless(isinstance(logdir, str), logdir)
- mock_setLogDir.side_effect = call_setLogDir
+ self.patch(foolscap.logging.log, 'setLogDir', call_setLogDir)
TestNode(basedir)
- self.failUnless(mock_setLogDir.called)
+ self.failUnless(ns.called)
-import time, os.path, platform, stat, re, simplejson, struct, shutil
-import mock
+import time, os.path, platform, stat, re, simplejson, struct, shutil
from twisted.trial import unittest
fileutil.write(final, share_file_data)
- mockstorageserver = mock.Mock()
+ class MockStorageServer(object):
+ def add_latency(self, category, latency):
+ pass
+ def count(self, name, delta=1):
+ pass
+
+ mockstorageserver = MockStorageServer()
# Now read from it.
br = BucketReader(mockstorageserver, final)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
- @mock.patch('allmydata.util.fileutil.get_disk_stats')
- def test_reserved_space(self, mock_get_disk_stats):
- reserved_space=10000
- mock_get_disk_stats.return_value = {
- 'free_for_nonroot': 15000,
- 'avail': max(15000 - reserved_space, 0),
+ def test_reserved_space(self):
+ reserved = 10000
+ allocated = 0
+
+ def call_get_disk_stats(whichdir, reserved_space=0):
+ self.failUnlessEqual(reserved_space, reserved)
+ return {
+ 'free_for_nonroot': 15000 - allocated,
+ 'avail': max(15000 - allocated - reserved_space, 0),
}
+ self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
- ss = self.create("test_reserved_space", reserved_space=reserved_space)
+ ss = self.create("test_reserved_space", reserved_space=reserved)
# 15k available, 10k reserved, leaves 5k for shares
# a newly created and filled share incurs this much overhead, beyond
del bw
self.failUnlessEqual(len(ss._active_writers), 0)
+ # this also changes the amount reported as available by call_get_disk_stats
allocated = 1001 + OVERHEAD + LEASE_SIZE
- # we have to manually increase available, since we're not doing real
- # disk measurements
- mock_get_disk_stats.return_value = {
- 'free_for_nonroot': 15000 - allocated,
- 'avail': max(15000 - allocated - reserved_space, 0),
- }
-
# now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
# 5000-1085=3915 free, therefore we can fit 39 100byte shares
already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
d = self.render1(page, args={"t": ["json"]})
return d
- @mock.patch('allmydata.util.fileutil.get_disk_stats')
- def test_status_no_disk_stats(self, mock_get_disk_stats):
- mock_get_disk_stats.side_effect = AttributeError()
+ def test_status_no_disk_stats(self):
+ def call_get_disk_stats(whichdir, reserved_space=0):
+ raise AttributeError()
+ self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
# Some platforms may have no disk stats API. Make sure the code can handle that
# (test runs on all platforms).
self.failUnlessIn("Space Available to Tahoe: ?", s)
self.failUnless(ss.get_available_space() is None)
- @mock.patch('allmydata.util.fileutil.get_disk_stats')
- def test_status_bad_disk_stats(self, mock_get_disk_stats):
- mock_get_disk_stats.side_effect = OSError()
+ def test_status_bad_disk_stats(self):
+ def call_get_disk_stats(whichdir, reserved_space=0):
+ raise OSError()
+ self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
# If the API to get disk stats exists but a call to it fails, then the status should
# show that no shares will be accepted, and get_available_space() should be 0.
self.failUnlessIn("Space Available to Tahoe: ?", s)
self.failUnlessEqual(ss.get_available_space(), 0)
- @mock.patch('allmydata.util.fileutil.get_disk_stats')
- def test_status_right_disk_stats(self, mock_get_disk_stats):
+ def test_status_right_disk_stats(self):
GB = 1000000000
total = 5*GB
free_for_root = 4*GB
free_for_nonroot = 3*GB
- reserved_space = 1*GB
- used = total - free_for_root
- avail = max(free_for_nonroot - reserved_space, 0)
- mock_get_disk_stats.return_value = {
- 'total': total,
- 'free_for_root': free_for_root,
- 'free_for_nonroot': free_for_nonroot,
- 'used': used,
- 'avail': avail,
- }
+ reserved = 1*GB
basedir = "storage/WebStatus/status_right_disk_stats"
fileutil.make_dirs(basedir)
- ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved_space)
+ ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved)
expecteddir = ss.sharedir
+
+ def call_get_disk_stats(whichdir, reserved_space=0):
+ self.failUnlessEqual(whichdir, expecteddir)
+ self.failUnlessEqual(reserved_space, reserved)
+ used = total - free_for_root
+ avail = max(free_for_nonroot - reserved_space, 0)
+ return {
+ 'total': total,
+ 'free_for_root': free_for_root,
+ 'free_for_nonroot': free_for_nonroot,
+ 'used': used,
+ 'avail': avail,
+ }
+ self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
+
ss.setServiceParent(self.s)
w = StorageStatus(ss)
html = w.renderSynchronously()
- self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
- mock_get_disk_stats.call_args_list)
-
self.failUnlessIn("<h1>Storage Server Status</h1>", html)
s = remove_tags(html)
self.failUnlessIn("Total disk space: 5.00 GB", s)
import allmydata.web
-import mock
-
# junk to appease pyflakes's outrage
[
accessors, appserver, static, rend, url, util, query, i18n, flat, guard, stan, testutil,
context, flatmdom, flatstan, twist, webform, processors, annotate, iformless, Decimal,
- minidom, allmydata, mock,
+ minidom, allmydata,
]
from allmydata.scripts import runner