from twisted.web.client import Agent, FileBodyProducer, ResponseDone
from twisted.web.http_headers import Headers
-from zope.interface import implements
+from zope.interface import implements, Interface
from allmydata.util import log
-from allmydata.util.assertutil import _assert
from allmydata.node import InvalidValueError
from allmydata.storage.backends.cloud.cloud_common import IContainer, \
CloudServiceError, ContainerItem, ContainerListing, ContainerRetryMixin
# Enabling this will cause secrets to be logged.
UNSAFE_DEBUG = False
+#AUTH_PATH = "v1.0"
+AUTH_PATH = "v2.0/tokens"
DEFAULT_AUTH_URLS = {
- "rackspace.com": "https://identity.api.rackspacecloud.com/v1.0",
- "rackspace.co.uk": "https://lon.identity.api.rackspacecloud.com/v1.0",
+ "rackspace.com": "https://identity.api.rackspacecloud.com/" + AUTH_PATH,
+ "rackspace.co.uk": "https://lon.identity.api.rackspacecloud.com/" + AUTH_PATH,
}
USER_AGENT = "Tahoe-LAFS OpenStack client"
container_name = config.get_config("storage", "openstack.container")
reauth_period = 23*60*60 #seconds
- auth_client = AuthenticationClient(api_key, provider, auth_service_url, username, reauth_period)
+ AuthenticatorClass = {"v1.0": AuthenticatorV1, "v2.0/tokens": AuthenticatorV2}[AUTH_PATH]
+ authenticator = AuthenticatorClass(auth_service_url, username, api_key)
+ auth_client = AuthenticationClient(authenticator, reauth_period)
return OpenStackContainer(auth_client, container_name)
-class UnexpectedAuthenticationResponse(Exception):
- def __init__(self, msg, response_code, response_headers):
- Exception.__init__(self, msg)
- self.response_code = response_code
- self.response_headers = response_headers
-
-
class AuthenticationInfo(object):
- def __init__(self, storage_url, cdn_management_url, auth_token):
- self.storage_url = storage_url
- self.cdn_management_url = cdn_management_url
+ def __init__(self, auth_token, public_storage_url, internal_storage_url=None):
self.auth_token = auth_token
+ self.public_storage_url = public_storage_url
+ self.internal_storage_url = internal_storage_url
def _http_request(what, agent, method, url, request_headers, body=None, need_response_body=False):
return hs[0]
-class AuthenticationClient(object):
+class IAuthenticator(Interface):
+ def make_auth_request():
+ """Returns (method, url, headers, body, need_response_body)."""
+
+ def parse_auth_response(response, body):
+ """Returns AuthenticationInfo."""
+
+
+class AuthenticatorV1(object):
+ implements(IAuthenticator)
"""
- I implement a client for the Rackspace authentication service.
- It is not clear whether this is also implemented by other OpenStack providers.
+ Authenticates according to V1 protocol as documented by Rackspace:
+ <http://docs.rackspace.com/files/api/v1/cf-devguide/content/Authentication-d1e639.html>.
"""
- def __init__(self, api_key, provider, auth_service_url, username, reauth_period, override_reactor=None):
+
+ def __init__(self, auth_service_url, username, api_key):
+ self._auth_service_url = auth_service_url
+ self._username = username
self._api_key = api_key
+
+ def make_auth_request(self):
+ request_headers = {
+ 'X-Auth-User': [self._username],
+ 'X-Auth-Key': [self._api_key],
+ }
+ return ('GET', self._auth_service_url, request_headers, None, False)
+
+ def parse_auth_response(self, response, body):
+ auth_token = _get_header(response, 'X-Auth-Token')
+ storage_url = _get_header(response, 'X-Storage-Url')
+ #cdn_management_url = _get_header(response, 'X-CDN-Management-Url')
+ return AuthenticationInfo(auth_token, storage_url)
+
+
+class AuthenticatorV2(object):
+ implements(IAuthenticator)
+ """
+ Authenticates according to V2 protocol as documented by Rackspace:
+ <http://docs.rackspace.com/auth/api/v2.0/auth-client-devguide/content/POST_authenticate_v2.0_tokens_.html>.
+ """
+
+ def __init__(self, auth_service_url, username, api_key):
self._auth_service_url = auth_service_url
self._username = username
+ self._api_key = api_key
+ #self._password = password
+
+ def make_auth_request(self):
+ # I suspect that 'RAX-KSKEY:apiKeyCredentials' is Rackspace-specific.
+ request = {
+ 'auth': {
+ # 'passwordCredentials': {
+ # 'username': self._username,
+ # 'password': self._password,
+ # }
+ 'RAX-KSKEY:apiKeyCredentials': {
+ 'username': self._username,
+ 'apiKey': self._api_key,
+ }
+ }
+ }
+ json = simplejson.dumps(request)
+ request_headers = {
+ 'Content-Type': ['application/json'],
+ }
+ return ('POST', self._auth_service_url, request_headers, json, True)
+
+ def parse_auth_response(self, response, body):
+ try:
+ decoded_body = simplejson.loads(body)
+ except simplejson.decoder.JSONDecodeError, e:
+ raise CloudServiceError(None, response.code,
+ message="could not decode auth response: %s" % (e,))
+
+ try:
+ # Scrabble around in the annoyingly complicated response body for the credentials we need.
+ access = decoded_body['access']
+ token = access['token']
+ auth_token = token['id']
+
+ user = access['user']
+ default_region = user.get('RAX-AUTH:defaultRegion', '')
+
+ serviceCatalog = access['serviceCatalog']
+ for service in serviceCatalog:
+ if service['type'] == 'object-store':
+ endpoints = service['endpoints']
+ for endpoint in endpoints:
+ if not default_region or endpoint['region'] == default_region:
+ public_storage_url = endpoint['publicURL']
+ internal_storage_url = endpoint['internalURL']
+ return AuthenticationInfo(auth_token, public_storage_url, internal_storage_url)
+ except KeyError, e:
+ raise CloudServiceError(None, response.code,
+ message="missing field in auth response: %s" % (e,))
+
+ raise CloudServiceError(None, response.code,
+ message="could not find a suitable storage endpoint in auth response")
+
+
+class AuthenticationClient(object):
+ """
+ I implement a generic authentication client.
+ The construction of the auth request and parsing of the response is delegated to an authenticator.
+ """
+
+ def __init__(self, authenticator, reauth_period, override_reactor=None):
+ self._authenticator = authenticator
self._reauth_period = reauth_period
self._reactor = override_reactor or reactor
self._agent = Agent(self._reactor)
- self._delayed = None
-
- _assert(provider.startswith("rackspace"), provider=provider)
- self._authenticate = self._authenticate_to_rackspace
+ self._shutdown = False
# Not authorized yet.
self._auth_info = None
self._auth_lock = defer.DeferredLock()
- d = self.get_auth_info()
- d.addBoth(lambda ign: None)
+ self._reauthenticate()
def get_auth_info(self):
# It is intentional that this returns the previous auth_info while a reauthentication is in progress.
def get_auth_info_locked(self, suppress_errors=False):
d = self._auth_lock.run(self._authenticate)
d.addCallback(lambda ign: self._auth_info)
- if suppress_errors:
- d.addErrback(lambda ign: self._auth_info)
return d
- def _authenticate_to_rackspace(self, ign=None):
- # <http://docs.rackspace.com/files/api/v1/cf-devguide/content/Authentication-d1e639.html>
+ def _authenticate(self):
+ (method, url, request_headers, body, need_response_body) = self._authenticator.make_auth_request()
- # Agent.request adds a Host header automatically based on the URL.
- request_headers = {
- 'User-Agent': ['Tahoe-LAFS authentication client'],
- 'X-Auth-User': [self._username],
- 'X-Auth-Key': [self._api_key],
- }
- log.msg(format="OpenStack auth GET %(url)s %(headers)s",
- url=self._auth_service_url, headers=repr(request_headers), level=log.OPERATIONAL)
- d = defer.succeed(None)
- d.addCallback(lambda ign: self._agent.request('GET', self._auth_service_url, Headers(request_headers), None))
-
- def _got_response(response):
- log.msg(format="OpenStack auth response: %(code)d %(phrase)s",
- code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
- _check_response_code(response)
-
- def _get_header(name):
- hs = response.headers.getRawHeaders(name)
- if len(hs) == 0:
- raise UnexpectedAuthenticationResponse("missing response header %r" % (name,),
- response.code, response.headers)
- return hs[0]
-
- storage_url = _get_header('X-Storage-Url')
- cdn_management_url = _get_header('X-CDN-Management-Url')
- auth_token = _get_header('X-Auth-Token')
+ d = _http_request("auth", self._agent, method, url, request_headers, body, need_response_body)
+ def _got_response( (response, body) ):
+ self._auth_info = self._authenticator.parse_auth_response(response, body)
if UNSAFE_DEBUG:
- print "Auth response is %s %s %s" % (storage_url, cdn_management_url, auth_token)
- self._auth_info = AuthenticationInfo(storage_url, cdn_management_url, auth_token)
+ print "Auth response is %s %s" % (self._auth_info.auth_token, self._auth_info.public_storage_url)
- self._delayed = self._reactor.callLater(self._reauth_period, self.get_auth_info_locked, suppress_errors=True)
+ if not self._shutdown:
+ if self._delayed:
+ self._delayed.cancel()
+ self._delayed = self._reactor.callLater(self._reauth_period, self._reauthenticate)
d.addCallback(_got_response)
def _failed(f):
self._auth_info = None
d.addErrback(_failed)
return d
+ def _reauthenticate(self):
+ self._delayed = None
+ d = self.get_auth_info_locked()
+ d.addBoth(lambda ign: None)
+ return d
+
def shutdown(self):
"""Used by unit tests to avoid unclean reactor errors."""
+ self._shutdown = True
if self._delayed:
self._delayed.cancel()
if reason.check(ResponseDone):
eventually_callback(self._done)("".join(self._data))
else:
- def _failed(): raise CloudServiceError(reason.getErrorMessage())
+ def _failed(): raise CloudServiceError(None, 0, message=reason.getErrorMessage())
eventually_errback(self._done)(defer.execute(_failed))
def when_done(self):
return ("<%s %r>" % (self.__class__.__name__, self._container_name,))
def _make_container_url(self, auth_info):
- return "%s/%s" % (auth_info.storage_url, urllib.quote(self._container_name, safe=''))
+ return "%s/%s" % (auth_info.public_storage_url, urllib.quote(self._container_name, safe=''))
def _make_object_url(self, auth_info, object_name):
- return "%s/%s/%s" % (auth_info.storage_url, urllib.quote(self._container_name, safe=''), urllib.quote(object_name))
+ return "%s/%s/%s" % (auth_info.public_storage_url, urllib.quote(self._container_name, safe=''),
+ urllib.quote(object_name))
def _create(self):
"""
"""
Retrieve object metadata only.
"""
- raise NotImplementedError
+ d = self._auth_client.get_auth_info()
+ def _do_head(auth_info):
+ request_headers = {
+ 'X-Auth-Token': [auth_info.auth_token],
+ }
+ url = self._make_object_url(auth_info, object_name)
+ return _http_request("head object", self._agent, 'HEAD', url, request_headers)
+ d.addCallback(_do_head)
+ def _got_head_response( (response, body) ):
+ print response
+ raise NotImplementedError
+ d.addCallback(_got_head_response)
+ return d
def _delete_object(self, object_name):
"""
USERNAME = "username"
CONTAINER = "container"
API_KEY = "api_key"
- STORAGE_URL = "https://storage.example/a"
- CDN_MANAGEMENT_URL = "https://cdn.example/a"
+ PUBLIC_STORAGE_URL = "https://public.storage.example/a"
+ INTERNAL_STORAGE_URL = "https://internal.storage.example/a"
AUTH_TOKEN = "auth_token"
TEST_SHARE_PREFIX = "shares/te/"
self.failUnlessIsInstance(headers, Headers)
for (key, values) in expected_headers.iteritems():
- self.failUnlessEqual(headers.getRawHeaders(key), values)
+ self.failUnlessEqual(headers.getRawHeaders(key), values, str((headers, key)))
d = defer.succeed(None)
if bodyProducer is None:
self.failUnlessEqual(expected_body, "")
else:
self.failUnless(IBodyProducer.providedBy(bodyProducer))
- self.failUnlessIn(bodyProducer.length, (len(expected_body), UNKNOWN_LENGTH))
body = StringIO()
d = bodyProducer.startProducing(FileConsumer(body))
d.addCallback(lambda ign: self.failUnlessEqual(body.getvalue(), expected_body))
+ d.addCallback(lambda ign: self.failUnlessIn(bodyProducer.length,
+ (len(expected_body), UNKNOWN_LENGTH)))
d.addCallback(lambda ign: MockResponse(response_code, response_phrase, response_headers, response_body))
return d
response_code, response_phrase, response_headers, response_body)
def _make_server(self, name):
- self._set_request('GET', self.AUTH_SERVICE_URL, {
- 'X-Auth-User': [self.USERNAME],
- 'X-Auth-Key': [self.API_KEY],
- }, "",
- 204, "No Content", {
- 'X-Storage-Url': [self.STORAGE_URL],
- 'X-CDN-Management-Url': [self.CDN_MANAGEMENT_URL],
- 'X-Auth-Token': [self.AUTH_TOKEN],
- }, "")
+ # This is for the v1 auth protocol.
+ #self._set_request('GET', self.AUTH_SERVICE_URL, {
+ # 'X-Auth-User': [self.USERNAME],
+ # 'X-Auth-Key': [self.API_KEY],
+ # }, "",
+ # 204, "No Content", {
+ # 'X-Storage-Url': [self.STORAGE_URL],
+ # 'X-Auth-Token': [self.AUTH_TOKEN],
+ # }, "")
+
+ self._set_request('POST', self.AUTH_SERVICE_URL, {
+ 'Content-Type': ['application/json'],
+ }, '{"auth": {"RAX-KSKEY:apiKeyCredentials": {"username": "username", "apiKey": "api_key"}}}',
+ 200, "OK", {
+ }, '''
+ {"access": {"token": {"id": "%s"},
+ "serviceCatalog": [{"endpoints": [{"region": "FOO", "publicURL": "%s", "internalURL": "%s"}],
+ "type": "object-store"}],
+ "user": {"RAX-AUTH:defaultRegion": "", "name": "%s"}
+ }
+ }''' % (self.AUTH_TOKEN, self.PUBLIC_STORAGE_URL, self.INTERNAL_STORAGE_URL, self.USERNAME))
storage_config = {
'openstack.provider': self.PROVIDER,
d = self.container._auth_client.get_auth_info()
def _check(auth_info):
- self.failUnlessEqual(auth_info.storage_url, self.STORAGE_URL)
- self.failUnlessEqual(auth_info.cdn_management_url, self.CDN_MANAGEMENT_URL)
+ self.failUnlessEqual(auth_info.public_storage_url, self.PUBLIC_STORAGE_URL)
+ self.failUnlessEqual(auth_info.internal_storage_url, self.INTERNAL_STORAGE_URL)
self.failUnlessEqual(auth_info.auth_token, self.AUTH_TOKEN)
d.addCallback(_check)
d.addBoth(self._shutdown)
self._patch_agent()
# Set up the requests that we expect to receive.
- self._set_request('GET', "/".join((self.STORAGE_URL, self.CONTAINER, "unexpected")), {
+ self._set_request('GET', "/".join((self.PUBLIC_STORAGE_URL, self.CONTAINER, "unexpected")), {
'X-Auth-Token': [self.AUTH_TOKEN],
}, "",
404, "Not Found", {}, "")
- self._set_request('PUT', "/".join((self.STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), {
+ self._set_request('PUT', "/".join((self.PUBLIC_STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), {
'X-Auth-Token': [self.AUTH_TOKEN],
'Content-Type': ['application/octet-stream'],
- 'Content-Length': [len(self.TEST_SHARE_DATA)],
+ #'Content-Length': [len(self.TEST_SHARE_DATA)],
}, self.TEST_SHARE_DATA,
204, "No Content", {}, "")
quoted_prefix = urllib.quote(self.TEST_SHARE_PREFIX, safe='')
self._set_request('GET', "%s/%s?format=json&prefix=%s"
- % (self.STORAGE_URL, self.CONTAINER, quoted_prefix), {
+ % (self.PUBLIC_STORAGE_URL, self.CONTAINER, quoted_prefix), {
'X-Auth-Token': [self.AUTH_TOKEN],
}, "",
200, "OK", {}, self.TEST_LISTING_JSON)
- self._set_request('GET', "/".join((self.STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), {
+ self._set_request('GET', "/".join((self.PUBLIC_STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), {
'X-Auth-Token': [self.AUTH_TOKEN],
}, "",
200, "OK", {}, self.TEST_SHARE_DATA)
d.addCallback(lambda res: self.failUnlessEqual(res, self.TEST_SHARE_DATA))
def _set_up_delete(ign):
- self._set_request('DELETE', "/".join((self.STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), {
+ self._set_request('DELETE', "/".join((self.PUBLIC_STORAGE_URL, self.CONTAINER, self.TEST_SHARE_NAME)), {
'X-Auth-Token': [self.AUTH_TOKEN],
}, "",
204, "No Content", {}, "")
# this changes the response to the request set up above
self._set_request('GET', "%s/%s?format=json&prefix=%s"
- % (self.STORAGE_URL, self.CONTAINER, quoted_prefix), {
+ % (self.PUBLIC_STORAGE_URL, self.CONTAINER, quoted_prefix), {
'X-Auth-Token': [self.AUTH_TOKEN],
}, "",
200, "OK", {}, "[]")