import time, os.path, platform, re, simplejson, struct, itertools
from collections import deque
+from cStringIO import StringIO
import mock
from twisted.trial import unittest
from twisted.internet import defer
from twisted.internet.task import Clock
from allmydata.util.deferredutil import for_items
+from twisted.web.iweb import IBodyProducer, UNKNOWN_LENGTH
from twisted.web.http_headers import Headers
+from twisted.protocols.ftp import FileConsumer
+from twisted.web.client import ResponseDone
from twisted.python.failure import Failure
from foolscap.logging.log import OPERATIONAL, INFREQUENT, WEIRD
from foolscap.logging.web import LogEvent
from allmydata import interfaces
+from allmydata.util.assertutil import precondition
from allmydata.util import fileutil, hashutil, base32, time_format
from allmydata.storage.server import StorageServer
from allmydata.storage.backends.null.null_backend import NullBackend
CDN_MANAGEMENT_URL = "cdn_management_url"
AUTH_TOKEN = "auth_token"
- def test_authentication_client(self):
- mock_response_headers = {
- 'X-Storage-Url': [self.STORAGE_URL],
- 'X-CDN-Management-Url': [self.CDN_MANAGEMENT_URL],
- 'X-Auth-Token': [self.AUTH_TOKEN],
- }
+ def _patch_agent(self):
+ self._requests = {}
+
class MockResponse(object):
- def __init__(mock_self):
- mock_self.headers = Headers(mock_response_headers)
- mock_self.code = 204
- mock_self.phrase = "No Content"
+ def __init__(mock_self, response_code, response_phrase, response_headers, response_body):
+ mock_self.code = response_code
+ mock_self.phrase = response_phrase
+ mock_self.headers = Headers(response_headers)
+ mock_self._body = response_body
+
+ def deliverBody(mock_self, protocol):
+ protocol.dataReceived(mock_self._body)
+ protocol.connectionLost(Failure(ResponseDone()))
class MockAgent(object):
def __init__(mock_self, reactor):
pass
- def request(mock_self, method, url, headers, foo):
- self.failUnlessEqual(method, 'GET')
- self.failUnlessEqual(url, self.AUTH_SERVICE_URL)
+
+ def request(mock_self, method, url, headers, bodyProducer=None):
+ self.failUnlessIn((method, url), self._requests)
+ (expected_headers, expected_body,
+ response_code, response_phrase, response_headers, response_body) = self._requests[(method, url)]
+
self.failUnlessIsInstance(headers, Headers)
- self.failUnlessEqual(headers.getRawHeaders('X-Auth-User'), [self.USERNAME])
- self.failUnlessEqual(headers.getRawHeaders('X-Auth-Key'), [self.API_KEY])
- return MockResponse()
+ for (key, values) in expected_headers.iteritems():
+ self.failUnlessEqual(headers.getRawHeaders(key), values)
+
+ d = defer.succeed(None)
+ if bodyProducer is None:
+ self.failUnlessEqual(expected_body, "")
+ else:
+ self.failUnless(IBodyProducer.providedBy(bodyProducer))
+ self.failUnlessIn(bodyProducer.length, (len(expected_body), UNKNOWN_LENGTH))
+ body = StringIO()
+ d = bodyProducer.startProducing(FileConsumer(body))
+ d.addCallback(lambda ign: self.failUnlessEqual(body.getvalue(), expected_body))
+ d.addCallback(lambda ign: MockResponse(response_code, response_phrase, response_headers, response_body))
+ return d
self.patch(openstack_container, 'Agent', MockAgent)
- workdir = self.workdir("test_authentication_client")
- privatedir = os.path.join(workdir, "private")
- fileutil.make_dirs(privatedir)
- fileutil.write(os.path.join(privatedir, "openstack_api_key"), self.API_KEY)
+ def _set_request(self, method, url, expected_headers, expected_body,
+ response_code, response_phrase, response_headers, response_body):
+ precondition(isinstance(expected_headers, dict), expected_headers)
+ precondition(isinstance(response_headers, dict), response_headers)
+ self._requests[(method, url)] = (expected_headers, expected_body,
+ response_code, response_phrase, response_headers, response_body)
+
+ def _make_server(self, name):
+ self._set_request('GET', self.AUTH_SERVICE_URL, {
+ 'X-Auth-User': [self.USERNAME],
+ 'X-Auth-Key': [self.API_KEY],
+ }, "",
+ 204, "No Content", {
+ 'X-Storage-Url': [self.STORAGE_URL],
+ 'X-CDN-Management-Url': [self.CDN_MANAGEMENT_URL],
+ 'X-Auth-Token': [self.AUTH_TOKEN],
+ }, "")
storage_config = {
'openstack.provider': self.PROVIDER,
def get_or_create_private_config(mock_self, filename):
return fileutil.read(os.path.join(privatedir, filename))
- config = MockConfig()
- clock = Clock()
- container = openstack_container.configure_openstack_container(workdir, config)
- backend = CloudBackend(container)
- server = StorageServer("\x00" * 20, backend, workdir,
- stats_provider=FakeStatsProvider(), clock=clock)
- server.setServiceParent(self.sparent)
- self.failUnless(server.backend._container is container, (server.backend._container, container))
+ self.workdir = self.workdir(name)
+ privatedir = os.path.join(self.workdir, "private")
+ fileutil.make_dirs(privatedir)
+ fileutil.write(os.path.join(privatedir, "openstack_api_key"), self.API_KEY)
+
+ self.config = MockConfig()
+ self.clock = Clock()
+ self.container = openstack_container.configure_openstack_container(self.workdir, self.config)
+ backend = CloudBackend(self.container)
+ self.server = StorageServer("\x00" * 20, backend, self.workdir,
+ stats_provider=FakeStatsProvider(), clock=self.clock)
+ self.server.setServiceParent(self.sparent)
+ self.failUnless(self.server.backend._container is self.container,
+ (self.server.backend._container, self.container))
+
+ def _shutdown(self, res):
+ # avoid unclean reactor error
+ self.container._auth_client.shutdown()
+ return res
+
+
+ def test_authentication_client(self):
+ self._patch_agent()
+ self._make_server("test_authentication_client")
- d = container._auth_client.get_auth_info()
+ d = self.container._auth_client.get_auth_info()
def _check(auth_info):
self.failUnlessEqual(auth_info.storage_url, self.STORAGE_URL)
self.failUnlessEqual(auth_info.cdn_management_url, self.CDN_MANAGEMENT_URL)
self.failUnlessEqual(auth_info.auth_token, self.AUTH_TOKEN)
d.addCallback(_check)
- def _shutdown(res):
- # avoid unclean reactor error
- container._auth_client.shutdown()
- return res
- d.addBoth(_shutdown)
+ d.addBoth(self._shutdown)
return d