"pycrypto == 2.1.0, == 2.3, >= 2.4.1",
"pyasn1 >= 0.0.8a",
- # http://www.voidspace.org.uk/python/mock/ , 0.8.0 provides "call"
- "mock >= 0.8.0",
-
# pycryptopp-0.6.0 includes ed25519
"pycryptopp >= 0.6.0",
('simplejson', 'simplejson'),
('pycrypto', 'Crypto'),
('pyasn1', 'pyasn1'),
- ('mock', 'mock'),
('txAWS', 'txaws'),
('oauth2client', 'oauth2client'),
('python-dateutil', 'dateutil'),
import os.path
-from twisted.trial import unittest
from cStringIO import StringIO
import urllib, re, sys
import simplejson
-from mock import patch, Mock, call
+from twisted.trial import unittest
+from twisted.python.monkey import MonkeyPatcher
+import __builtin__
from allmydata.util import fileutil, hashutil, base32, keyutil
+from allmydata.util.namespace import Namespace
from allmydata import uri
from allmydata.immutable import upload
from allmydata.interfaces import MDMF_VERSION, SDMF_VERSION
def test_exception_catcher(self):
self.basedir = "cli/exception_catcher"
- runner_mock = Mock()
- sys_exit_mock = Mock()
stderr = StringIO()
- self.patch(sys, "argv", ["tahoe"])
- self.patch(runner, "runner", runner_mock)
- self.patch(sys, "exit", sys_exit_mock)
- self.patch(sys, "stderr", stderr)
exc = Exception("canary")
+ ns = Namespace()
+ ns.runner_called = False
def call_runner(args, install_node_control=True):
+ ns.runner_called = True
+ self.failUnlessEqual(install_node_control, True)
raise exc
- runner_mock.side_effect = call_runner
- runner.run()
- self.failUnlessEqual(runner_mock.call_args_list, [call([], install_node_control=True)])
- self.failUnlessEqual(sys_exit_mock.call_args_list, [call(1)])
+ ns.sys_exit_called = False
+ def call_sys_exit(exitcode):
+ ns.sys_exit_called = True
+ self.failUnlessEqual(exitcode, 1)
+
+ patcher = MonkeyPatcher((runner, 'runner', call_runner),
+ (sys, 'argv', ["tahoe"]),
+ (sys, 'exit', call_sys_exit),
+ (sys, 'stderr', stderr))
+ patcher.runWithPatches(runner.run)
+
+ self.failUnless(ns.runner_called)
+ self.failUnless(ns.sys_exit_called)
self.failUnlessIn(str(exc), stderr.getvalue())
self._check_filtering(filtered, root_listdir, (u'lib.a', u'_darcs', u'subdir'),
(nice_doc,))
- @patch('__builtin__.file')
- def test_exclude_from_tilde_expansion(self, mock):
+ def test_exclude_from_tilde_expansion(self):
basedir = "cli/Backup/exclude_from_tilde_expansion"
fileutil.make_dirs(basedir)
nodeurl_path = os.path.join(basedir, 'node.url')
fileutil.write(nodeurl_path, 'http://example.net:2357/')
- def parse(args): return parse_options(basedir, "backup", args)
# ensure that tilde expansion is performed on exclude-from argument
exclude_file = u'~/.tahoe/excludes.dummy'
- mock.return_value = StringIO()
- parse(['--exclude-from', unicode_to_argv(exclude_file), 'from', 'to'])
- self.failUnlessIn(((abspath_expanduser_unicode(exclude_file),), {}), mock.call_args_list)
+ ns = Namespace()
+ ns.called = False
+ def call_file(name, *args):
+ ns.called = True
+ self.failUnlessEqual(name, abspath_expanduser_unicode(exclude_file))
+ return StringIO()
+
+ patcher = MonkeyPatcher((__builtin__, 'file', call_file))
+ patcher.runWithPatches(parse_options, basedir, "backup", ['--exclude-from', unicode_to_argv(exclude_file), 'from', 'to'])
+ self.failUnless(ns.called)
def test_ignore_symlinks(self):
if not hasattr(os, 'symlink'):
import os
+
+import twisted
from twisted.trial import unittest
from twisted.application import service
import allmydata
from allmydata.node import OldConfigError, OldConfigOptionError, InvalidValueError, MissingConfigEntry
+from allmydata.util.namespace import Namespace
+import allmydata.frontends.drop_upload
+import allmydata.util.log
from allmydata import client
from allmydata.storage_client import StorageFarmBroker
from allmydata.storage.backends.disk.disk_backend import DiskBackend
from allmydata.storage.backends.cloud.cloud_backend import CloudBackend
+from allmydata.storage.backends.cloud.s3 import s3_container
+from allmydata.storage.backends.cloud.openstack import openstack_container
+from allmydata.storage.backends.cloud.googlestorage import googlestorage_container
+from allmydata.storage.backends.cloud.msazure import msazure_container
from allmydata.util import base32, fileutil
from allmydata.interfaces import IFilesystemNode, IFileNode, \
IImmutableFileNode, IMutableFileNode, IDirectoryNode
from foolscap.api import flushEventualQueue
import allmydata.test.common_util as testutil
-import mock
-
BASECONFIG = ("[client]\n"
"introducer.furl = \n"
server = c.getServiceNamed("storage")
self.failUnless(isinstance(server.backend, DiskBackend), server.backend)
- @mock.patch('twisted.python.log.msg')
- def test_error_on_old_config_files(self, mock_log_msg):
+ def test_error_on_old_config_files(self):
basedir = "test_client.Basic.test_error_on_old_config_files"
os.mkdir(basedir)
fileutil.write(os.path.join(basedir, "tahoe.cfg"),
fileutil.write(os.path.join(basedir, "readonly_storage"), "")
fileutil.write(os.path.join(basedir, "debug_discard_storage"), "")
+ logged_messages = []
+ self.patch(twisted.python.log, 'msg', logged_messages.append)
+
e = self.failUnlessRaises(OldConfigError, client.Client, basedir)
self.failUnlessIn(os.path.abspath(os.path.join(basedir, "introducer.furl")), e.args[0])
self.failUnlessIn(os.path.abspath(os.path.join(basedir, "no_storage")), e.args[0])
for oldfile in ['introducer.furl', 'no_storage', 'readonly_storage',
'debug_discard_storage']:
- logged = [ m for m in mock_log_msg.call_args_list if
- ("Found pre-Tahoe-LAFS-v1.3 configuration file" in str(m[0][0]) and oldfile in str(m[0][0])) ]
- self.failUnless(logged, (oldfile, mock_log_msg.call_args_list))
+ logged = [ m for m in logged_messages if
+ ("Found pre-Tahoe-LAFS-v1.3 configuration file" in str(m) and oldfile in str(m)) ]
+ self.failUnless(logged, (oldfile, logged_messages))
for oldfile in [
'nickname', 'webport', 'keepalive_timeout', 'log_gatherer.furl',
'disconnect_timeout', 'advertised_ip_addresses', 'helper.furl',
'key_generator.furl', 'stats_gatherer.furl', 'sizelimit',
'run_helper']:
- logged = [ m for m in mock_log_msg.call_args_list if
- ("Found pre-Tahoe-LAFS-v1.3 configuration file" in str(m[0][0]) and oldfile in str(m[0][0])) ]
- self.failIf(logged, oldfile)
+ logged = [ m for m in logged_messages if
+ ("Found pre-Tahoe-LAFS-v1.3 configuration file" in str(m) and oldfile in str(m)) ]
+ self.failIf(logged, (oldfile, logged_messages))
def test_secrets(self):
basedir = "test_client.Basic.test_secrets"
fileutil.make_dirs(os.path.join(basedir, "private"))
fileutil.write(os.path.join(basedir, "private", filename), secret)
- @mock.patch('allmydata.storage.backends.cloud.s3.s3_container.S3Container')
- def test_s3_config_good_defaults(self, mock_S3Container):
+ def test_s3_config_good_defaults(self):
+ ns = Namespace()
+ class MockS3Container(object):
+ def __init__(self, access_key, secret_key, url, container_name, usertoken=None, producttoken=None, override_reactor=None):
+ ns.container_call_args = (access_key, secret_key, url, container_name, usertoken, producttoken)
+
+ self.patch(s3_container, 'S3Container', MockS3Container)
+
basedir = "client.Basic.test_s3_config_good_defaults"
os.mkdir(basedir)
self._write_secret(basedir, "s3secret")
fileutil.write(os.path.join(basedir, "tahoe.cfg"), config)
c = client.Client(basedir)
- mock_S3Container.assert_called_with("keyid", "dummy", "http://s3.amazonaws.com", "test", None, None)
+ self.failUnlessEqual(ns.container_call_args, ("keyid", "dummy", "http://s3.amazonaws.com", "test", None, None))
server = c.getServiceNamed("storage")
self.failUnless(isinstance(server.backend, CloudBackend), server.backend)
- mock_S3Container.reset_mock()
self._write_secret(basedir, "s3producttoken", secret="{ProductToken}")
self.failUnlessRaises(InvalidValueError, client.Client, basedir)
- mock_S3Container.reset_mock()
self._write_secret(basedir, "s3usertoken", secret="{UserToken}")
fileutil.write(os.path.join(basedir, "tahoe.cfg"), config + "s3.url = http://s3.example.com\n")
+ ns.container_call_args = None
c = client.Client(basedir)
- mock_S3Container.assert_called_with("keyid", "dummy", "http://s3.example.com", "test",
- "{UserToken}", "{ProductToken}")
+ self.failUnlessEqual(ns.container_call_args, ("keyid", "dummy", "http://s3.example.com", "test",
+ "{UserToken}", "{ProductToken}"))
def test_s3_readonly_bad(self):
basedir = "client.Basic.test_s3_readonly_bad"
"s3.bucket = test\n")
self.failUnlessRaises(MissingConfigEntry, client.Client, basedir)
- @mock.patch('allmydata.storage.backends.cloud.openstack.openstack_container.AuthenticatorV2')
- @mock.patch('allmydata.storage.backends.cloud.openstack.openstack_container.AuthenticationClient')
- @mock.patch('allmydata.storage.backends.cloud.openstack.openstack_container.OpenStackContainer')
- def test_openstack_config_good_defaults(self, mock_OpenStackContainer, mock_AuthenticationClient,
- mock_Authenticator):
+ def test_openstack_config_good_defaults(self):
+ ns = Namespace()
+ class MockAuthenticatorV2(object):
+ def __init__(self, auth_service_url, credentials):
+ ns.authenticator_call_args = (auth_service_url, credentials)
+ class MockAuthenticationClient(object):
+ def __init__(self, authenticator, reauth_period, override_reactor=None):
+ ns.authclient_call_args = (authenticator, reauth_period)
+ class MockOpenStackContainer(object):
+ def __init__(self, auth_client, container_name, override_reactor=None):
+ ns.container_call_args = (auth_client, container_name)
+
+ self.patch(openstack_container, 'AuthenticatorV2', MockAuthenticatorV2)
+ self.patch(openstack_container, 'AuthenticationClient', MockAuthenticationClient)
+ self.patch(openstack_container, 'OpenStackContainer', MockOpenStackContainer)
+
basedir = "client.Basic.test_openstack_config_good_defaults"
os.mkdir(basedir)
self._write_secret(basedir, "openstack_api_key")
fileutil.write(os.path.join(basedir, "tahoe.cfg"), config)
c = client.Client(basedir)
- mock_Authenticator.assert_called_with("https://identity.api.rackspacecloud.com/v2.0/tokens",
- {'RAX-KSKEY:apiKeyCredentials': {'username': 'alex', 'apiKey': 'dummy'}})
- authclient_call_args = mock_AuthenticationClient.call_args_list
- self.failUnlessEqual(len(authclient_call_args), 1)
- self.failUnlessEqual(authclient_call_args[0][0][1:], (11*60*60,))
- container_call_args = mock_OpenStackContainer.call_args_list
- self.failUnlessEqual(len(container_call_args), 1)
- self.failUnlessEqual(container_call_args[0][0][1:], ("test",))
+ self.failUnlessEqual(ns.authenticator_call_args, ("https://identity.api.rackspacecloud.com/v2.0/tokens",
+ {'RAX-KSKEY:apiKeyCredentials': {'username': 'alex', 'apiKey': 'dummy'}}))
+ self.failUnless(isinstance(ns.authclient_call_args[0], MockAuthenticatorV2), ns.authclient_call_args)
+ self.failUnlessEqual(ns.authclient_call_args[1 :], (11*60*60,))
+ self.failUnless(isinstance(ns.container_call_args[0], MockAuthenticationClient), ns.container_call_args)
+ self.failUnlessEqual(ns.container_call_args[1 :], ("test",))
server = c.getServiceNamed("storage")
self.failUnless(isinstance(server.backend, CloudBackend), server.backend)
"googlestorage.project_id = 456\n")
self.failUnlessRaises(MissingConfigEntry, client.Client, basedir)
- @mock.patch('allmydata.storage.backends.cloud.googlestorage.googlestorage_container.AuthenticationClient')
- @mock.patch('allmydata.storage.backends.cloud.googlestorage.googlestorage_container.GoogleStorageContainer')
- def test_googlestorage_config(self, mock_OpenStackContainer, mock_AuthenticationClient):
+ def test_googlestorage_config(self):
"""
Given good configuration, we correctly configure a good GoogleStorageContainer.
"""
+ ns = Namespace()
+ class MockAuthenticationClient(object):
+ def __init__(self, account_name, private_key, private_key_password='notasecret',
+ _credentialsClass=None, _deferToThread=None):
+ ns.authclient_call_args = (account_name, private_key, private_key_password)
+ class MockGoogleStorageContainer(object):
+ def __init__(self, auth_client, project_id, bucket_name, override_reactor=None):
+ ns.container_call_args = (auth_client, project_id, bucket_name)
+
+ self.patch(googlestorage_container, 'AuthenticationClient', MockAuthenticationClient)
+ self.patch(googlestorage_container, 'GoogleStorageContainer', MockGoogleStorageContainer)
+
basedir = self.mktemp()
os.mkdir(basedir)
self._write_secret(basedir, "googlestorage_private_key", "sekrit")
c = client.Client(basedir)
server = c.getServiceNamed("storage")
self.failUnless(isinstance(server.backend, CloudBackend), server.backend)
- # Protect against typos with isinstance(), because mock is dangerous.
- self.assertFalse(isinstance(mock_AuthenticationClient.assert_called_once_with,
- mock.Mock))
- mock_AuthenticationClient.assert_called_once_with("u@example.com", "sekrit")
- self.assertFalse(isinstance(mock_OpenStackContainer.assert_called_once_with,
- mock.Mock))
- mock_OpenStackContainer.assert_called_once_with(mock_AuthenticationClient.return_value,
- "456", "bucket")
+ self.failUnlessEqual(ns.authclient_call_args, ("u@example.com", "sekrit", "notasecret"))
+ self.failUnless(isinstance(ns.container_call_args[0], MockAuthenticationClient), ns.container_call_args)
+ self.failUnlessEqual(ns.container_call_args[1 :], ("456", "bucket"))
def test_msazure_config_required(self):
"""
"msazure.container = bucket\n")
self.failUnlessRaises(MissingConfigEntry, client.Client, basedir)
- @mock.patch('allmydata.storage.backends.cloud.msazure.msazure_container.MSAzureStorageContainer')
- def test_msazure_config(self, mock_MSAzureStorageContainer):
+ def test_msazure_config(self):
"""
Given good configuration, we correctly configure a good MSAzureStorageContainer.
"""
+ ns = Namespace()
+ class MockMSAzureStorageContainer(object):
+ def __init__(self, account_name, account_key, container_name, override_reactor=None):
+ ns.container_call_args = (account_name, account_key, container_name)
+
+ self.patch(msazure_container, 'MSAzureStorageContainer', MockMSAzureStorageContainer)
+
basedir = self.mktemp()
os.mkdir(basedir)
self._write_secret(basedir, "msazure_account_key", "abc")
c = client.Client(basedir)
server = c.getServiceNamed("storage")
self.failUnless(isinstance(server.backend, CloudBackend), server.backend)
- # Protect against typos with isinstance(), because mock is dangerous.
- self.assertFalse(isinstance(
- mock_MSAzureStorageContainer.assert_called_once_with, mock.Mock))
- mock_MSAzureStorageContainer.assert_called_once_with(
- "theaccount", "abc", "bucket")
+ self.failUnlessEqual(ns.container_call_args, ("theaccount", "abc", "bucket"))
def test_expire_mutable_false_unsupported(self):
basedir = "client.Basic.test_expire_mutable_false_unsupported"
_check("helper.furl = None", None)
_check("helper.furl = pb://blah\n", "pb://blah")
- @mock.patch('allmydata.util.log.msg')
- @mock.patch('allmydata.frontends.drop_upload.DropUploader')
- def test_create_drop_uploader(self, mock_drop_uploader, mock_log_msg):
+ def test_create_drop_uploader(self):
class MockDropUploader(service.MultiService):
name = 'drop-upload'
self.local_dir_utf8 = local_dir_utf8
self.inotify = inotify
- mock_drop_uploader.side_effect = MockDropUploader
+ self.patch(allmydata.frontends.drop_upload, 'DropUploader', MockDropUploader)
upload_dircap = "URI:DIR2:blah"
local_dir_utf8 = u"loc\u0101l_dir".encode('utf-8')
class Boom(Exception):
pass
- mock_drop_uploader.side_effect = Boom()
+ def BoomDropUploader(client, upload_dircap, local_dir_utf8, inotify=None):
+ raise Boom()
+
+ logged_messages = []
+ def mock_log(*args, **kwargs):
+ logged_messages.append("%r %r" % (args, kwargs))
+ self.patch(allmydata.util.log, 'msg', mock_log)
+ self.patch(allmydata.frontends.drop_upload, 'DropUploader', BoomDropUploader)
basedir2 = "test_client.Basic.test_create_drop_uploader2"
os.mkdir(basedir2)
fileutil.write(os.path.join(basedir2, "private", "drop_upload_dircap"), "URI:DIR2:blah")
c2 = client.Client(basedir2)
self.failUnlessRaises(KeyError, c2.getServiceNamed, 'drop-upload')
- self.failUnless([True for arg in mock_log_msg.call_args_list if "Boom" in repr(arg)],
- mock_log_msg.call_args_list)
+ self.failUnless([True for arg in logged_messages if "Boom" in arg],
+ logged_messages)
def flush_but_dont_ignore(res):
shutil.rmtree(tmpdir)
sys.exit(0)
-from twisted.trial import unittest
-from mock import patch
+
import os, sys, locale
+from twisted.trial import unittest
+
from allmydata.test.common_util import ReallyEqualMixin
from allmydata.util import encodingutil
from allmydata.util.encodingutil import argv_to_unicode, unicode_to_url, \
from twisted.python import usage
+
+class MockStdout(object):
+ pass
+
class EncodingUtilErrors(ReallyEqualMixin, unittest.TestCase):
+ def test_get_io_encoding(self):
+ mock_stdout = MockStdout()
+ self.patch(sys, 'stdout', mock_stdout)
- @patch('sys.stdout')
- def test_get_io_encoding(self, mock_stdout):
mock_stdout.encoding = 'UTF-8'
_reload()
self.failUnlessReallyEqual(get_io_encoding(), 'utf-8')
else:
self.failUnlessRaises(AssertionError, _reload)
- @patch('locale.getpreferredencoding')
- def test_get_io_encoding_not_from_stdout(self, mock_locale_getpreferredencoding):
- locale # hush pyflakes
- mock_locale_getpreferredencoding.return_value = 'koi8-r'
+ def test_get_io_encoding_not_from_stdout(self):
+ preferredencoding = 'koi8-r'
+ def call_locale_getpreferredencoding():
+ return preferredencoding
+ self.patch(locale, 'getpreferredencoding', call_locale_getpreferredencoding)
+ mock_stdout = MockStdout()
+ self.patch(sys, 'stdout', mock_stdout)
- class DummyStdout:
- pass
- old_stdout = sys.stdout
- sys.stdout = DummyStdout()
- try:
- expected = sys.platform == "win32" and 'utf-8' or 'koi8-r'
- _reload()
- self.failUnlessReallyEqual(get_io_encoding(), expected)
+ expected = sys.platform == "win32" and 'utf-8' or 'koi8-r'
+ _reload()
+ self.failUnlessReallyEqual(get_io_encoding(), expected)
- sys.stdout.encoding = None
- _reload()
- self.failUnlessReallyEqual(get_io_encoding(), expected)
+ mock_stdout.encoding = None
+ _reload()
+ self.failUnlessReallyEqual(get_io_encoding(), expected)
- mock_locale_getpreferredencoding.return_value = None
- _reload()
- self.failUnlessReallyEqual(get_io_encoding(), 'utf-8')
- finally:
- sys.stdout = old_stdout
+ preferredencoding = None
+ _reload()
+ self.failUnlessReallyEqual(get_io_encoding(), 'utf-8')
def test_argv_to_unicode(self):
encodingutil.io_encoding = 'utf-8'
encodingutil.io_encoding = 'koi8-r'
self.failUnlessRaises(UnicodeEncodeError, unicode_to_output, lumiere_nfc)
- @patch('os.listdir')
- def test_no_unicode_normalization(self, mock):
+ def test_no_unicode_normalization(self):
# Pretend to run on a Unicode platform.
- # We normalized to NFC in 1.7beta, but we now don't.
- orig_platform = sys.platform
- try:
- sys.platform = 'darwin'
- mock.return_value = [Artonwall_nfd]
- _reload()
- self.failUnlessReallyEqual(listdir_unicode(u'/dummy'), [Artonwall_nfd])
- finally:
- sys.platform = orig_platform
+ # listdir_unicode normalized to NFC in 1.7beta, but now doesn't.
+
+ def call_os_listdir(path):
+ return [Artonwall_nfd]
+ self.patch(os, 'listdir', call_os_listdir)
+ self.patch(sys, 'platform', 'darwin')
+
+ _reload()
+ self.failUnlessReallyEqual(listdir_unicode(u'/dummy'), [Artonwall_nfd])
+
# The following tests apply only to platforms that don't store filenames as
# Unicode entities on the filesystem.
sys.platform = self.original_platform
_reload()
- @patch('sys.getfilesystemencoding')
- @patch('os.listdir')
- def test_listdir_unicode(self, mock_listdir, mock_getfilesystemencoding):
+ def test_listdir_unicode(self):
# What happens if latin1-encoded filenames are encountered on an UTF-8
# filesystem?
- mock_listdir.return_value = [
- lumiere_nfc.encode('utf-8'),
- lumiere_nfc.encode('latin1')]
+ def call_os_listdir(path):
+ return [
+ lumiere_nfc.encode('utf-8'),
+ lumiere_nfc.encode('latin1')
+ ]
+ self.patch(os, 'listdir', call_os_listdir)
+
+ sys_filesystemencoding = 'utf-8'
+ def call_sys_getfilesystemencoding():
+ return sys_filesystemencoding
+ self.patch(sys, 'getfilesystemencoding', call_sys_getfilesystemencoding)
- mock_getfilesystemencoding.return_value = 'utf-8'
_reload()
self.failUnlessRaises(FilenameEncodingError,
listdir_unicode,
# We're trying to list a directory whose name cannot be represented in
# the filesystem encoding. This should fail.
- mock_getfilesystemencoding.return_value = 'ascii'
+ sys_filesystemencoding = 'ascii'
_reload()
self.failUnlessRaises(FilenameEncodingError,
listdir_unicode,
sys.platform = self.original_platform
_reload()
- @patch('sys.stdout')
- def test_argv_to_unicode(self, mock):
+ def test_argv_to_unicode(self):
if 'argv' not in dir(self):
return
- mock.encoding = self.io_encoding
+ mock_stdout = MockStdout()
+ mock_stdout.encoding = self.io_encoding
+ self.patch(sys, 'stdout', mock_stdout)
+
argu = lumiere_nfc
argv = self.argv
_reload()
def test_unicode_to_url(self):
self.failUnless(unicode_to_url(lumiere_nfc), "lumi\xc3\xa8re")
- @patch('sys.stdout')
- def test_unicode_to_output(self, mock):
+ def test_unicode_to_output(self):
if 'argv' not in dir(self):
return
- mock.encoding = self.io_encoding
+ mock_stdout = MockStdout()
+ mock_stdout.encoding = self.io_encoding
+ self.patch(sys, 'stdout', mock_stdout)
+
_reload()
self.failUnlessReallyEqual(unicode_to_output(lumiere_nfc), self.argv)
_reload()
self.failUnlessReallyEqual(unicode_platform(), matrix[self.platform])
- @patch('sys.getfilesystemencoding')
- @patch('os.listdir')
- def test_listdir_unicode(self, mock_listdir, mock_getfilesystemencoding):
+ def test_listdir_unicode(self):
if 'dirlist' not in dir(self):
return
"that we are testing for the benefit of a different platform."
% (self.filesystem_encoding,))
- mock_listdir.return_value = self.dirlist
- mock_getfilesystemencoding.return_value = self.filesystem_encoding
+ def call_os_listdir(path):
+ return self.dirlist
+ self.patch(os, 'listdir', call_os_listdir)
+
+ def call_sys_getfilesystemencoding():
+ return self.filesystem_encoding
+ self.patch(sys, 'getfilesystemencoding', call_sys_getfilesystemencoding)
_reload()
filenames = listdir_unicode(u'/dummy')
+
import random
from twisted.trial import unittest
from twisted.internet import defer
-import mock
from foolscap.api import eventually
from allmydata.test import common
from allmydata.immutable.upload import Data
from allmydata.immutable.downloader import finder
+
+class MockShareHashTree(object):
+ def needed_hashes(self):
+ return False
+
class MockNode(object):
def __init__(self, check_reneging, check_fetch_failed):
self.got = 0
self.check_fetch_failed = check_fetch_failed
self._si_prefix='aa'
self.have_UEB = True
- self.share_hash_tree = mock.Mock()
- self.share_hash_tree.needed_hashes.return_value = False
+ self.share_hash_tree = MockShareHashTree()
self.on_want_more_shares = None
def when_finished(self):
rcap = uri.CHKFileURI('a'*32, 'a'*32, 3, 99, 100)
vcap = rcap.get_verify_cap()
+ class MockBuckets(object):
+ pass
+
class MockServer(object):
def __init__(self, buckets):
self.version = {
self.s.hungry()
eventually(_give_buckets_and_hunger_again)
return d
+
class MockIServer(object):
def __init__(self, serverid, rref):
self.serverid = serverid
def get_version(self):
return self.rref.version
- mockserver1 = MockServer({1: mock.Mock(), 2: mock.Mock()})
+ class MockStorageBroker(object):
+ def __init__(self, servers):
+ self.servers = servers
+ def get_servers_for_psi(self, si):
+ return self.servers
+
+ class MockDownloadStatus(object):
+ def add_dyhb_request(self, server, when):
+ return MockDYHBEvent()
+
+ class MockDYHBEvent(object):
+ def finished(self, shnums, when):
+ pass
+
+ mockserver1 = MockServer({1: MockBuckets(), 2: MockBuckets()})
mockserver2 = MockServer({})
- mockserver3 = MockServer({3: mock.Mock()})
- mockstoragebroker = mock.Mock()
+ mockserver3 = MockServer({3: MockBuckets()})
servers = [ MockIServer("ms1", mockserver1),
MockIServer("ms2", mockserver2),
MockIServer("ms3", mockserver3), ]
- mockstoragebroker.get_servers_for_psi.return_value = servers
- mockdownloadstatus = mock.Mock()
+ mockstoragebroker = MockStorageBroker(servers)
+ mockdownloadstatus = MockDownloadStatus()
mocknode = MockNode(check_reneging=True, check_fetch_failed=True)
s = finder.ShareFinder(mockstoragebroker, vcap, mocknode, mockdownloadstatus)
from twisted.trial import unittest
+from twisted.python.monkey import MonkeyPatcher
import allmydata
-import mock
+import __builtin__
-real_import_func = __import__
class T(unittest.TestCase):
- @mock.patch('__builtin__.__import__')
- def test_report_import_error(self, mockimport):
+ def test_report_import_error(self):
+ real_import_func = __import__
def raiseIE_from_this_particular_func(name, *args):
if name == "foolscap":
marker = "wheeeyo"
else:
return real_import_func(name, *args)
- mockimport.side_effect = raiseIE_from_this_particular_func
+ # Let's run as little code as possible with __import__ patched.
+ patcher = MonkeyPatcher((__builtin__, '__import__', raiseIE_from_this_particular_func))
+ vers_and_locs = patcher.runWithPatches(allmydata.get_package_versions_and_locations)
- vers_and_locs = allmydata.get_package_versions_and_locations()
for (pkgname, stuff) in vers_and_locs:
if pkgname == 'foolscap':
self.failUnless('wheeeyo' in str(stuff[2]), stuff)
import os, stat, sys, time
+
from twisted.trial import unittest
from twisted.internet import defer
from twisted.python import log
-from mock import patch
-
from foolscap.api import flushEventualQueue
+import foolscap.logging.log
+
from twisted.application import service
from allmydata.node import Node, formatTimeTahoeStyle, MissingConfigEntry
from allmydata.util import fileutil
+from allmydata.util.namespace import Namespace
import allmydata.test.common_util as testutil
+
class LoggingMultiService(service.MultiService):
def log(self, msg, **kw):
pass
bits = stat.S_IMODE(st[stat.ST_MODE])
self.failUnless(bits & 0001 == 0, bits)
- @patch("foolscap.logging.log.setLogDir")
- def test_logdir_is_str(self, mock_setLogDir):
+ def test_logdir_is_str(self):
basedir = "test_node/test_logdir_is_str"
fileutil.make_dirs(basedir)
+ ns = Namespace()
+ ns.called = False
def call_setLogDir(logdir):
+ ns.called = True
self.failUnless(isinstance(logdir, str), logdir)
- mock_setLogDir.side_effect = call_setLogDir
+ self.patch(foolscap.logging.log, 'setLogDir', call_setLogDir)
TestNode(basedir)
- self.failUnless(mock_setLogDir.called)
+ self.failUnless(ns.called)
from cStringIO import StringIO
import thread
-import mock
from twisted.trial import unittest
from twisted.internet import defer
d = defer.succeed(None)
d.addCallback(lambda ign: load_immutable_disk_share(final))
def _got_share(share):
- mockstorageserver = mock.Mock()
+ class MockStorageServer(object):
+ def add_latency(self, category, latency):
+ pass
+ def count(self, name, delta=1):
+ pass
+
+ mockstorageserver = MockStorageServer()
account = FakeAccount(mockstorageserver)
# Now read from it.
return d
+class MockSignedJwtAssertionCredentials(object):
+ pass
class GoogleStorageAuthenticationClient(unittest.TestCase):
"""
"""
When AuthenticationClient() is created, it refreshes its access token.
"""
- from oauth2client.client import SignedJwtAssertionCredentials
auth = googlestorage_container.AuthenticationClient(
"u@example.com", "xxx123",
- _credentialsClass=mock.create_autospec(SignedJwtAssertionCredentials),
+ _credentialsClass=MockSignedJwtAssertionCredentials,
_deferToThread=defer.maybeDeferred)
self.failUnlessEqual(auth._credentials.refresh.call_count, 1)
AuthenticationClient.get_authorization_header() refreshes its
credentials if the access token has expired.
"""
- from oauth2client.client import SignedJwtAssertionCredentials
auth = googlestorage_container.AuthenticationClient(
"u@example.com", "xxx123",
- _credentialsClass=mock.create_autospec(SignedJwtAssertionCredentials),
+ _credentialsClass=MockSignedJwtAssertionCredentials,
_deferToThread=defer.maybeDeferred)
auth._credentials.apply = lambda d: d.__setitem__('Authorization', 'xxx')
auth._credentials.access_token_expired = True
from oauth2client.client import SignedJwtAssertionCredentials
auth = googlestorage_container.AuthenticationClient(
"u@example.com", "xxx123",
- _credentialsClass=mock.create_autospec(SignedJwtAssertionCredentials),
+ _credentialsClass=MockSignedJwtAssertionCredentials,
_deferToThread=defer.maybeDeferred)
auth._credentials.apply = lambda d: d.__setitem__('Authorization', 'xxx')
auth._credentials.access_token_expired = False
used for the Authorization header, which is ASCII-encoded if
necessary.
"""
- from oauth2client.client import SignedJwtAssertionCredentials
- class NoNetworkCreds(SignedJwtAssertionCredentials):
+ class NoNetworkCreds(MockSignedJwtAssertionCredentials):
def refresh(self, http):
self.access_token = u"xxx"
+
auth = googlestorage_container.AuthenticationClient(
"u@example.com", "xxx123",
_credentialsClass=NoNetworkCreds,
def fakeDeferToThread(f, *args):
return results.pop(0)
- from oauth2client.client import SignedJwtAssertionCredentials
auth = googlestorage_container.AuthenticationClient(
"u@example.com", "xxx123",
- _credentialsClass=mock.create_autospec(SignedJwtAssertionCredentials),
+ _credentialsClass=MockSignedJwtAssertionCredentials,
_deferToThread=fakeDeferToThread)
# Initial authorization call happens...
self.failUnlessEqual(len(results), 1)
httplib2.Http instance.
"""
from httplib2 import Http
- from oauth2client.client import SignedJwtAssertionCredentials
- class NoNetworkCreds(SignedJwtAssertionCredentials):
+ class NoNetworkCreds(MockSignedJwtAssertionCredentials):
def refresh(cred_self, http):
cred_self.access_token = "xxx"
self.failUnlessIsInstance(http, Http)
self.thread_id = thread.get_ident()
+
auth = googlestorage_container.AuthenticationClient(
"u@example.com", "xxx123",
_credentialsClass=NoNetworkCreds)
Deferred which can be fired by the caller.
"""
d = defer.Deferred()
- self.container._http_request = mock.create_autospec(
- self.container._http_request, return_value=d)
+ def call_http_request(mock_self, what, method, url, request_headers, body=None, need_response_body=False):
+ return d
+ self.container._http_request = call_http_request
return d
self.reactor = Clock()
self.container = CommonContainerMixin("container", self.reactor)
- # We don't just use mock.Mock, but do this silly thing so we can use
- # create_autospec, because create_autospec is the only safe way to use
- # mock.
- self.container._http_request = (lambda description, method, url, headers,
- body=None, need_response_body=False: None)
+ def call_http_request(self, what, method, url, request_headers, body=None, need_response_body=False):
+ return None
+ self.container._http_request = call_http_request
def test_retry_response_code(self):
"""
d.addCallback(_allocated2)
return d
- @mock.patch('allmydata.util.fileutil.get_disk_stats')
- def test_reserved_space(self, mock_get_disk_stats):
- reserved_space=10000
- mock_get_disk_stats.return_value = {
- 'free_for_nonroot': 15000,
- 'avail': max(15000 - reserved_space, 0),
+ def test_reserved_space(self):
+ ns = Namespace()
+ ns.reserved = 10000
+ ns.allocated = 0
+
+ def call_get_disk_stats(whichdir, reserved_space=0):
+ self.failUnlessEqual(reserved_space, ns.reserved)
+ return {
+ 'free_for_nonroot': 15000 - ns.allocated,
+ 'avail': max(15000 - ns.allocated - ns.reserved_space, 0),
}
+ self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
- server = self.create("test_reserved_space", reserved_space=reserved_space)
+ server = self.create("test_reserved_space", reserved_space=ns.reserved)
aa = server.get_accountant().get_anonymous_account()
# 15k available, 10k reserved, leaves 5k for shares
return d3
d2.addCallback(_allocated2)
- allocated = 1001 + OVERHEAD + LEASE_SIZE
-
- # we have to manually increase available, since we're not doing real
- # disk measurements
- def _mock(ign):
- mock_get_disk_stats.return_value = {
- 'free_for_nonroot': 15000 - allocated,
- 'avail': max(15000 - allocated - reserved_space, 0),
- }
- d2.addCallback(_mock)
+ def _change_allocated(ign):
+ # this also changes the amount reported as available by call_get_disk_stats
+ ns.allocated = 1001 + OVERHEAD + LEASE_SIZE
+ d2.addCallback(_change_allocated)
# now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
# 5000-1085=3915 free, therefore we can fit 39 100byte shares
d.addCallback(_check_json)
return d
- @mock.patch('allmydata.util.fileutil.get_disk_stats')
- def test_status_no_disk_stats(self, mock_get_disk_stats):
- mock_get_disk_stats.side_effect = AttributeError()
+ def test_status_no_disk_stats(self):
+ def call_get_disk_stats(whichdir, reserved_space=0):
+ raise AttributeError()
+ self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
# Some platforms may have no disk stats API. Make sure the code can handle that
# (test runs on all platforms).
self.failUnlessIn("Space Available to Tahoe: ?", s)
self.failUnless(server.get_available_space() is None)
- @mock.patch('allmydata.util.fileutil.get_disk_stats')
- def test_status_bad_disk_stats(self, mock_get_disk_stats):
- mock_get_disk_stats.side_effect = OSError()
+ def test_status_bad_disk_stats(self):
+ def call_get_disk_stats(whichdir, reserved_space=0):
+ raise OSError()
+ self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
# If the API to get disk stats exists but a call to it fails, then the status should
# show that no shares will be accepted, and get_available_space() should be 0.
self.failUnlessIn("Space Available to Tahoe: ?", s)
self.failUnlessEqual(server.get_available_space(), 0)
- @mock.patch('allmydata.util.fileutil.get_disk_stats')
- def test_status_right_disk_stats(self, mock_get_disk_stats):
+ def test_status_right_disk_stats(self):
GB = 1000000000
total = 5*GB
free_for_root = 4*GB
free_for_nonroot = 3*GB
- reserved_space = 1*GB
- used = total - free_for_root
- avail = max(free_for_nonroot - reserved_space, 0)
- mock_get_disk_stats.return_value = {
- 'total': total,
- 'free_for_root': free_for_root,
- 'free_for_nonroot': free_for_nonroot,
- 'used': used,
- 'avail': avail,
- }
+ reserved = 1*GB
server = self.create("test_status_right_disk_stats", reserved_space=GB)
expecteddir = server.backend._sharedir
+ def call_get_disk_stats(whichdir, reserved_space=0):
+ self.failUnlessEqual(whichdir, expecteddir)
+ self.failUnlessEqual(reserved_space, reserved)
+ used = total - free_for_root
+ avail = max(free_for_nonroot - reserved_space, 0)
+ return {
+ 'total': total,
+ 'free_for_root': free_for_root,
+ 'free_for_nonroot': free_for_nonroot,
+ 'used': used,
+ 'avail': avail,
+ }
+ self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
+
w = StorageStatus(server)
html = w.renderSynchronously()
- self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
- (mock_get_disk_stats.call_args_list, expecteddir, reserved_space))
-
self.failUnlessIn("<h1>Storage Server Status</h1>", html)
s = remove_tags(html)
self.failUnlessIn("Total disk space: 5.00 GB", s)
import allmydata.web
-import mock
-
# junk to appease pyflakes's outrage
[
accessors, appserver, static, rend, url, util, query, i18n, flat, guard, stan, testutil,
context, flatmdom, flatstan, twist, webform, processors, annotate, iformless, Decimal,
- minidom, allmydata, mock,
+ minidom, allmydata,
]
from allmydata.scripts import runner