From: David-Sarah Hopwood Date: Wed, 20 Feb 2013 06:21:25 +0000 (+0000) Subject: OpenStack: generalize to support multiple auth protocols, and add V2 protocol. X-Git-Url: https://git.rkrishnan.org/Site/Content/Exhibitors/class-simplejson.JSONDecoder.html?a=commitdiff_plain;h=84bb2cb6b9240b0d6a984a28203c06748d8301ce;p=tahoe-lafs%2Ftahoe-lafs.git OpenStack: generalize to support multiple auth protocols, and add V2 protocol. Signed-off-by: David-Sarah Hopwood --- diff --git a/src/allmydata/storage/backends/cloud/openstack/openstack_container.py b/src/allmydata/storage/backends/cloud/openstack/openstack_container.py index 45a7d5b8..a86ac5fe 100644 --- a/src/allmydata/storage/backends/cloud/openstack/openstack_container.py +++ b/src/allmydata/storage/backends/cloud/openstack/openstack_container.py @@ -10,10 +10,9 @@ from twisted.internet.protocol import Protocol from twisted.web.client import Agent, FileBodyProducer, ResponseDone from twisted.web.http_headers import Headers -from zope.interface import implements +from zope.interface import implements, Interface from allmydata.util import log -from allmydata.util.assertutil import _assert from allmydata.node import InvalidValueError from allmydata.storage.backends.cloud.cloud_common import IContainer, \ CloudServiceError, ContainerItem, ContainerListing, ContainerRetryMixin @@ -22,10 +21,12 @@ from allmydata.storage.backends.cloud.cloud_common import IContainer, \ # Enabling this will cause secrets to be logged. UNSAFE_DEBUG = False +#AUTH_PATH = "v1.0" +AUTH_PATH = "v2.0/tokens" DEFAULT_AUTH_URLS = { - "rackspace.com": "https://identity.api.rackspacecloud.com/v1.0", - "rackspace.co.uk": "https://lon.identity.api.rackspacecloud.com/v1.0", + "rackspace.com": "https://identity.api.rackspacecloud.com/" + AUTH_PATH, + "rackspace.co.uk": "https://lon.identity.api.rackspacecloud.com/" + AUTH_PATH, } USER_AGENT = "Tahoe-LAFS OpenStack client" @@ -42,22 +43,17 @@ def configure_openstack_container(storedir, config): container_name = config.get_config("storage", "openstack.container") reauth_period = 23*60*60 #seconds - auth_client = AuthenticationClient(api_key, provider, auth_service_url, username, reauth_period) + AuthenticatorClass = {"v1.0": AuthenticatorV1, "v2.0/tokens": AuthenticatorV2}[AUTH_PATH] + authenticator = AuthenticatorClass(auth_service_url, username, api_key) + auth_client = AuthenticationClient(authenticator, reauth_period) return OpenStackContainer(auth_client, container_name) -class UnexpectedAuthenticationResponse(Exception): - def __init__(self, msg, response_code, response_headers): - Exception.__init__(self, msg) - self.response_code = response_code - self.response_headers = response_headers - - class AuthenticationInfo(object): - def __init__(self, storage_url, cdn_management_url, auth_token): - self.storage_url = storage_url - self.cdn_management_url = cdn_management_url + def __init__(self, auth_token, public_storage_url, internal_storage_url=None): self.auth_token = auth_token + self.public_storage_url = public_storage_url + self.internal_storage_url = internal_storage_url def _http_request(what, agent, method, url, request_headers, body=None, need_response_body=False): @@ -104,28 +100,123 @@ def _get_header(response, name): return hs[0] -class AuthenticationClient(object): +class IAuthenticator(Interface): + def make_auth_request(): + """Returns (method, url, headers, body, need_response_body).""" + + def parse_auth_response(response, body): + """Returns AuthenticationInfo.""" + + +class AuthenticatorV1(object): + implements(IAuthenticator) """ - I implement a client for the Rackspace authentication service. - It is not clear whether this is also implemented by other OpenStack providers. + Authenticates according to V1 protocol as documented by Rackspace: + . """ - def __init__(self, api_key, provider, auth_service_url, username, reauth_period, override_reactor=None): + + def __init__(self, auth_service_url, username, api_key): + self._auth_service_url = auth_service_url + self._username = username self._api_key = api_key + + def make_auth_request(self): + request_headers = { + 'X-Auth-User': [self._username], + 'X-Auth-Key': [self._api_key], + } + return ('GET', self._auth_service_url, request_headers, None, False) + + def parse_auth_response(self, response, body): + auth_token = _get_header(response, 'X-Auth-Token') + storage_url = _get_header(response, 'X-Storage-Url') + #cdn_management_url = _get_header(response, 'X-CDN-Management-Url') + return AuthenticationInfo(auth_token, storage_url) + + +class AuthenticatorV2(object): + implements(IAuthenticator) + """ + Authenticates according to V2 protocol as documented by Rackspace: + . + """ + + def __init__(self, auth_service_url, username, api_key): self._auth_service_url = auth_service_url self._username = username + self._api_key = api_key + #self._password = password + + def make_auth_request(self): + # I suspect that 'RAX-KSKEY:apiKeyCredentials' is Rackspace-specific. + request = { + 'auth': { + # 'passwordCredentials': { + # 'username': self._username, + # 'password': self._password, + # } + 'RAX-KSKEY:apiKeyCredentials': { + 'username': self._username, + 'apiKey': self._api_key, + } + } + } + json = simplejson.dumps(request) + request_headers = { + 'Content-Type': ['application/json'], + } + return ('POST', self._auth_service_url, request_headers, json, True) + + def parse_auth_response(self, response, body): + try: + decoded_body = simplejson.loads(body) + except simplejson.decoder.JSONDecodeError, e: + raise CloudServiceError(None, response.code, + message="could not decode auth response: %s" % (e,)) + + try: + # Scrabble around in the annoyingly complicated response body for the credentials we need. + access = decoded_body['access'] + token = access['token'] + auth_token = token['id'] + + user = access['user'] + default_region = user.get('RAX-AUTH:defaultRegion', '') + + serviceCatalog = access['serviceCatalog'] + for service in serviceCatalog: + if service['type'] == 'object-store': + endpoints = service['endpoints'] + for endpoint in endpoints: + if not default_region or endpoint['region'] == default_region: + public_storage_url = endpoint['publicURL'] + internal_storage_url = endpoint['internalURL'] + return AuthenticationInfo(auth_token, public_storage_url, internal_storage_url) + except KeyError, e: + raise CloudServiceError(None, response.code, + message="missing field in auth response: %s" % (e,)) + + raise CloudServiceError(None, response.code, + message="could not find a suitable storage endpoint in auth response") + + +class AuthenticationClient(object): + """ + I implement a generic authentication client. + The construction of the auth request and parsing of the response is delegated to an authenticator. + """ + + def __init__(self, authenticator, reauth_period, override_reactor=None): + self._authenticator = authenticator self._reauth_period = reauth_period self._reactor = override_reactor or reactor self._agent = Agent(self._reactor) - self._delayed = None - - _assert(provider.startswith("rackspace"), provider=provider) - self._authenticate = self._authenticate_to_rackspace + self._shutdown = False # Not authorized yet. self._auth_info = None self._auth_lock = defer.DeferredLock() - d = self.get_auth_info() - d.addBoth(lambda ign: None) + self._reauthenticate() def get_auth_info(self): # It is intentional that this returns the previous auth_info while a reauthentication is in progress. @@ -137,44 +228,21 @@ class AuthenticationClient(object): def get_auth_info_locked(self, suppress_errors=False): d = self._auth_lock.run(self._authenticate) d.addCallback(lambda ign: self._auth_info) - if suppress_errors: - d.addErrback(lambda ign: self._auth_info) return d - def _authenticate_to_rackspace(self, ign=None): - # + def _authenticate(self): + (method, url, request_headers, body, need_response_body) = self._authenticator.make_auth_request() - # Agent.request adds a Host header automatically based on the URL. - request_headers = { - 'User-Agent': ['Tahoe-LAFS authentication client'], - 'X-Auth-User': [self._username], - 'X-Auth-Key': [self._api_key], - } - log.msg(format="OpenStack auth GET %(url)s %(headers)s", - url=self._auth_service_url, headers=repr(request_headers), level=log.OPERATIONAL) - d = defer.succeed(None) - d.addCallback(lambda ign: self._agent.request('GET', self._auth_service_url, Headers(request_headers), None)) - - def _got_response(response): - log.msg(format="OpenStack auth response: %(code)d %(phrase)s", - code=response.code, phrase=response.phrase, level=log.OPERATIONAL) - _check_response_code(response) - - def _get_header(name): - hs = response.headers.getRawHeaders(name) - if len(hs) == 0: - raise UnexpectedAuthenticationResponse("missing response header %r" % (name,), - response.code, response.headers) - return hs[0] - - storage_url = _get_header('X-Storage-Url') - cdn_management_url = _get_header('X-CDN-Management-Url') - auth_token = _get_header('X-Auth-Token') + d = _http_request("auth", self._agent, method, url, request_headers, body, need_response_body) + def _got_response( (response, body) ): + self._auth_info = self._authenticator.parse_auth_response(response, body) if UNSAFE_DEBUG: - print "Auth response is %s %s %s" % (storage_url, cdn_management_url, auth_token) - self._auth_info = AuthenticationInfo(storage_url, cdn_management_url, auth_token) + print "Auth response is %s %s" % (self._auth_info.auth_token, self._auth_info.public_storage_url) - self._delayed = self._reactor.callLater(self._reauth_period, self.get_auth_info_locked, suppress_errors=True) + if not self._shutdown: + if self._delayed: + self._delayed.cancel() + self._delayed = self._reactor.callLater(self._reauth_period, self._reauthenticate) d.addCallback(_got_response) def _failed(f): self._auth_info = None @@ -184,8 +252,15 @@ class AuthenticationClient(object): d.addErrback(_failed) return d + def _reauthenticate(self): + self._delayed = None + d = self.get_auth_info_locked() + d.addBoth(lambda ign: None) + return d + def shutdown(self): """Used by unit tests to avoid unclean reactor errors.""" + self._shutdown = True if self._delayed: self._delayed.cancel() @@ -208,7 +283,7 @@ class DataCollector(Protocol): if reason.check(ResponseDone): eventually_callback(self._done)("".join(self._data)) else: - def _failed(): raise CloudServiceError(reason.getErrorMessage()) + def _failed(): raise CloudServiceError(None, 0, message=reason.getErrorMessage()) eventually_errback(self._done)(defer.execute(_failed)) def when_done(self): @@ -230,10 +305,11 @@ class OpenStackContainer(ContainerRetryMixin): return ("<%s %r>" % (self.__class__.__name__, self._container_name,)) def _make_container_url(self, auth_info): - return "%s/%s" % (auth_info.storage_url, urllib.quote(self._container_name, safe='')) + return "%s/%s" % (auth_info.public_storage_url, urllib.quote(self._container_name, safe='')) def _make_object_url(self, auth_info, object_name): - return "%s/%s/%s" % (auth_info.storage_url, urllib.quote(self._container_name, safe=''), urllib.quote(object_name)) + return "%s/%s/%s" % (auth_info.public_storage_url, urllib.quote(self._container_name, safe=''), + urllib.quote(object_name)) def _create(self): """ @@ -329,7 +405,19 @@ class OpenStackContainer(ContainerRetryMixin): """ Retrieve object metadata only. """ - raise NotImplementedError + d = self._auth_client.get_auth_info() + def _do_head(auth_info): + request_headers = { + 'X-Auth-Token': [auth_info.auth_token], + } + url = self._make_object_url(auth_info, object_name) + return _http_request("head object", self._agent, 'HEAD', url, request_headers) + d.addCallback(_do_head) + def _got_head_response( (response, body) ): + print response + raise NotImplementedError + d.addCallback(_got_head_response) + return d def _delete_object(self, object_name): """ diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py index 7eb58f4f..e221147d 100644 --- a/src/allmydata/test/test_client.py +++ b/src/allmydata/test/test_client.py @@ -347,9 +347,11 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase): "s3.bucket = test\n") self.failUnlessRaises(MissingConfigEntry, client.Client, basedir) + @mock.patch('allmydata.storage.backends.cloud.openstack.openstack_container.AuthenticatorV2') @mock.patch('allmydata.storage.backends.cloud.openstack.openstack_container.AuthenticationClient') @mock.patch('allmydata.storage.backends.cloud.openstack.openstack_container.OpenStackContainer') - def test_openstack_config_good_defaults(self, mock_OpenStackContainer, mock_AuthenticationClient): + def test_openstack_config_good_defaults(self, mock_OpenStackContainer, mock_AuthenticationClient, + mock_Authenticator): basedir = "client.Basic.test_openstack_config_good_defaults" os.mkdir(basedir) self._write_secret(basedir, "openstack_api_key") @@ -363,9 +365,11 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase): fileutil.write(os.path.join(basedir, "tahoe.cfg"), config) c = client.Client(basedir) - mock_AuthenticationClient.assert_called_with("dummy", "rackspace.com", - "https://identity.api.rackspacecloud.com/v1.0", - "alex", 23*60*60) + mock_Authenticator.assert_called_with("https://identity.api.rackspacecloud.com/v2.0/tokens", + "alex", "dummy") + authclient_call_args = mock_AuthenticationClient.call_args_list + self.failUnlessEqual(len(authclient_call_args), 1) + self.failUnlessEqual(authclient_call_args[0][0][1:], (23*60*60,)) container_call_args = mock_OpenStackContainer.call_args_list self.failUnlessEqual(len(container_call_args), 1) self.failUnlessEqual(container_call_args[0][0][1:], ("test",)) diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 8a00f8b8..1f43ffff 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -413,8 +413,8 @@ class OpenStackCloudBackend(ServiceParentMixin, WorkdirMixin, ShouldFailMixin, u USERNAME = "username" CONTAINER = "container" API_KEY = "api_key" - STORAGE_URL = "https://storage.example/a" - CDN_MANAGEMENT_URL = "https://cdn.example/a" + PUBLIC_STORAGE_URL = "https://public.storage.example/a" + INTERNAL_STORAGE_URL = "https://internal.storage.example/a" AUTH_TOKEN = "auth_token" TEST_SHARE_PREFIX = "shares/te/" @@ -450,17 +450,18 @@ class OpenStackCloudBackend(ServiceParentMixin, WorkdirMixin, ShouldFailMixin, u self.failUnlessIsInstance(headers, Headers) for (key, values) in expected_headers.iteritems(): - self.failUnlessEqual(headers.getRawHeaders(key), values) + self.failUnlessEqual(headers.getRawHeaders(key), values, str((headers, key))) d = defer.succeed(None) if bodyProducer is None: self.failUnlessEqual(expected_body, "") else: self.failUnless(IBodyProducer.providedBy(bodyProducer)) - self.failUnlessIn(bodyProducer.length, (len(expected_body), UNKNOWN_LENGTH)) body = StringIO() d = bodyProducer.startProducing(FileConsumer(body)) d.addCallback(lambda ign: self.failUnlessEqual(body.getvalue(), expected_body)) + d.addCallback(lambda ign: self.failUnlessIn(bodyProducer.length, + (len(expected_body), UNKNOWN_LENGTH))) d.addCallback(lambda ign: MockResponse(response_code, response_phrase, response_headers, response_body)) return d @@ -474,15 +475,27 @@ class OpenStackCloudBackend(ServiceParentMixin, WorkdirMixin, ShouldFailMixin, u response_code, response_phrase, response_headers, response_body) def _make_server(self, name): - self._set_request('GET', self.AUTH_SERVICE_URL, { - 'X-Auth-User': [self.USERNAME], - 'X-Auth-Key': [self.API_KEY], - }, "", - 204, "No Content", { - 'X-Storage-Url': [self.STORAGE_URL], - 'X-CDN-Management-Url': [self.CDN_MANAGEMENT_URL], - 'X-Auth-Token': [self.AUTH_TOKEN], - }, "") + # This is for the v1 auth protocol. + #self._set_request('GET', self.AUTH_SERVICE_URL, { + # 'X-Auth-User': [self.USERNAME], + # 'X-Auth-Key': [self.API_KEY], + # }, "", + # 204, "No Content", { + # 'X-Storage-Url': [self.STORAGE_URL], + # 'X-Auth-Token': [self.AUTH_TOKEN], + # }, "") + + self._set_request('POST', self.AUTH_SERVICE_URL, { + 'Content-Type': ['application/json'], + }, '{"auth": {"RAX-KSKEY:apiKeyCredentials": {"username": "username", "apiKey": "api_key"}}}', + 200, "OK", { + }, ''' + {"access": {"token": {"id": "%s"}, + "serviceCatalog": [{"endpoints": [{"region": "FOO", "publicURL": "%s", "internalURL": "%s"}], + "type": "object-store"}], + "user": {"RAX-AUTH:defaultRegion": "", "name": "%s"} + } + }''' % (self.AUTH_TOKEN, self.PUBLIC_STORAGE_URL, self.INTERNAL_STORAGE_URL, self.USERNAME)) storage_config = { 'openstack.provider': self.PROVIDER, @@ -527,8 +540,8 @@ class OpenStackCloudBackend(ServiceParentMixin, WorkdirMixin, ShouldFailMixin, u d = self.container._auth_client.get_auth_info() def _check(auth_info): - self.failUnlessEqual(auth_info.storage_url, self.STORAGE_URL) - self.failUnlessEqual(auth_info.cdn_management_url, self.CDN_MANAGEMENT_URL) + self.failUnlessEqual(auth_info.public_storage_url, self.PUBLIC_STORAGE_URL) + self.failUnlessEqual(auth_info.internal_storage_url, self.INTERNAL_STORAGE_URL) self.failUnlessEqual(auth_info.auth_token, self.AUTH_TOKEN) d.addCallback(_check) d.addBoth(self._shutdown) @@ -538,26 +551,26 @@ class OpenStackCloudBackend(ServiceParentMixin, WorkdirMixin, ShouldFailMixin, u self._patch_agent() # Set up the requests that we expect to receive. - self._set_request('GET', "/".join((self.STORAGE_URL, self.CONTAINER, "unexpected")), { + self._set_request('GET', "/".join((self.PUBLIC_STORAGE_URL, self.CONTAINER, "unexpected")), { 'X-Auth-Token': [self.AUTH_TOKEN], }, "", 404, "Not Found", {}, "") - self._set_request('PUT', "/".join((self.STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), { + self._set_request('PUT', "/".join((self.PUBLIC_STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), { 'X-Auth-Token': [self.AUTH_TOKEN], 'Content-Type': ['application/octet-stream'], - 'Content-Length': [len(self.TEST_SHARE_DATA)], + #'Content-Length': [len(self.TEST_SHARE_DATA)], }, self.TEST_SHARE_DATA, 204, "No Content", {}, "") quoted_prefix = urllib.quote(self.TEST_SHARE_PREFIX, safe='') self._set_request('GET', "%s/%s?format=json&prefix=%s" - % (self.STORAGE_URL, self.CONTAINER, quoted_prefix), { + % (self.PUBLIC_STORAGE_URL, self.CONTAINER, quoted_prefix), { 'X-Auth-Token': [self.AUTH_TOKEN], }, "", 200, "OK", {}, self.TEST_LISTING_JSON) - self._set_request('GET', "/".join((self.STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), { + self._set_request('GET', "/".join((self.PUBLIC_STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), { 'X-Auth-Token': [self.AUTH_TOKEN], }, "", 200, "OK", {}, self.TEST_SHARE_DATA) @@ -588,14 +601,14 @@ class OpenStackCloudBackend(ServiceParentMixin, WorkdirMixin, ShouldFailMixin, u d.addCallback(lambda res: self.failUnlessEqual(res, self.TEST_SHARE_DATA)) def _set_up_delete(ign): - self._set_request('DELETE', "/".join((self.STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), { + self._set_request('DELETE', "/".join((self.PUBLIC_STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), { 'X-Auth-Token': [self.AUTH_TOKEN], }, "", 204, "No Content", {}, "") # this changes the response to the request set up above self._set_request('GET', "%s/%s?format=json&prefix=%s" - % (self.STORAGE_URL, self.CONTAINER, quoted_prefix), { + % (self.PUBLIC_STORAGE_URL, self.CONTAINER, quoted_prefix), { 'X-Auth-Token': [self.AUTH_TOKEN], }, "", 200, "OK", {}, "[]")