+from twisted.internet import defer, reactor
+from twisted.web.client import Agent
+from twisted.web.http_headers import Headers
+
from zope.interface import implements
+from allmydata.util import log
+from allmydata.util.assertutil import _assert
from allmydata.node import InvalidValueError
from allmydata.storage.backends.cloud.cloud_common import IContainer, \
ContainerRetryMixin, ContainerListMixin
+DEFAULT_AUTH_URLS = {
+ "rackspace": "https://identity.api.rackspacecloud.com/v1.0",
+ "rackspace-uk": "https://lon.identity.api.rackspacecloud.com/v1.0",
+}
+
def configure_openstack_container(storedir, config):
- accesskeyid = config.get_config("storage", "s3.access_key_id")
- secretkey = config.get_or_create_private_config("s3secret")
- usertoken = config.get_optional_private_config("s3usertoken")
- producttoken = config.get_optional_private_config("s3producttoken")
- if producttoken and not usertoken:
- raise InvalidValueError("If private/s3producttoken is present, private/s3usertoken must also be present.")
- url = config.get_config("storage", "s3.url", "http://s3.amazonaws.com")
- container_name = config.get_config("storage", "s3.bucket")
+ api_key = config.get_or_create_private_config("openstack_api_key")
+ provider = config.get_config("storage", "openstack.provider", "rackspace").lower()
+ if provider not in DEFAULT_AUTH_URLS:
+ raise InvalidValueError("[storage]openstack.provider %r is not recognized\n"
+ "Valid providers are: %s" % (provider, ", ".join(DEFAULT_AUTH_URLS.keys())))
+
+ auth_service_url = config.get_config("storage", "openstack.url", DEFAULT_AUTH_URLS[provider])
+ username = config.get_config("storage", "openstack.username")
+ reauth_period = 23*60*60 #seconds
+
+ auth_client = AuthenticationClient(api_key, provider, auth_service_url, username, reauth_period)
+ return OpenStackContainer(auth_client)
+
- return S3Container(accesskeyid, secretkey, url, container_name, usertoken, producttoken)
+class UnexpectedAuthenticationResponse(Exception):
+ def __init__(self, msg, response_code, response_headers):
+ Exception.__init__(self, msg)
+ self.response_code = response_code
+ self.response_headers = response_headers
-class S3Container(ContainerRetryMixin, ContainerListMixin):
+class AuthenticationInfo(object):
+ def __init__(self, storage_url, cdn_management_url, auth_token):
+ self.storage_url = storage_url
+ self.cdn_management_url = cdn_management_url
+ self.auth_token = auth_token
+
+
+class AuthenticationClient(object):
+ """
+ I implement a client for the Rackspace authentication service.
+ It is not clear whether this is also implemented by other OpenStack providers.
+ """
+ def __init__(self, api_key, provider, auth_service_url, username, reauth_period, override_reactor=None):
+ self.api_key = api_key
+ self.auth_service_url = auth_service_url
+ self.username = username
+ self.reauth_period = reauth_period
+ self.reactor = override_reactor or reactor
+ self.agent = Agent(self.reactor)
+
+ _assert(provider.startswith("rackspace"), provider=provider)
+ self._authenticate = self._authenticate_to_rackspace
+
+ # Not authorized yet.
+ self.auth_info = None
+ self.first_auth_lock = defer.DeferredLock()
+ d = self.get_auth_info()
+ d.addBoth(lambda ign: None)
+
+ def get_auth_info(self):
+ # It is intentional that this returns the previous auth_info while a reauthentication is in progress.
+ if self.auth_info is not None:
+ return defer.succeed(self.auth_info)
+ else:
+ return self.get_auth_info_locked()
+
+ def get_auth_info_locked(self, suppress_errors=False):
+ d = self.first_auth_lock.acquire()
+ d.addCallback(self._authenticate)
+ def _release(res):
+ self.first_auth_lock.release()
+ return res
+ d.addBoth(_release)
+ d.addCallback(lambda ign: self.auth_info)
+ if suppress_errors:
+ d.addErrback(lambda ign: self.auth_info)
+ return d
+
+ def _authenticate_to_rackspace(self, ign=None):
+ # <http://docs.rackspace.com/files/api/v1/cf-devguide/content/Authentication-d1e639.html>
+
+ # Agent.request adds a Host header automatically based on the URL.
+ request_headers = {
+ 'User-Agent': ['Tahoe-LAFS authentication client'],
+ 'X-Auth-User': [self.username],
+ 'X-Auth-Key': [self.api_key],
+ }
+ log.msg("GET %s %r" % (self.auth_service_url, request_headers))
+ d = defer.succeed(None)
+ d.addCallback(lambda ign: self.agent.request('GET', self.auth_service_url, Headers(request_headers), None))
+
+ def _got_response(response):
+ log.msg("OpenStack auth response: %r %s" % (response.code, response.phrase))
+ # "any 2xx response is a good response"
+ if response.code < 200 or response.code >= 300:
+ raise UnexpectedAuthenticationResponse("unexpected response code %r %s" % (response.code, response.phrase),
+ response.code, response.headers)
+
+ def _get_header(name):
+ hs = response.headers.getRawHeaders(name)
+ if len(hs) == 0:
+ raise UnexpectedAuthenticationResponse("missing response header %r" % (name,),
+ response.code, response.headers)
+ return hs[0]
+
+ storage_url = _get_header('X-Storage-Url')
+ cdn_management_url = _get_header('X-CDN-Management-Url')
+ auth_token = _get_header('X-Auth-Token')
+ # Don't log this unless debugging, since auth_token is a secret.
+ #log.msg("Auth response is %s %s %s" % (storage_url, cdn_management_url, auth_token))
+ self.auth_info = AuthenticationInfo(storage_url, cdn_management_url, auth_token)
+
+ self.reactor.callLater(self.reauth_period, self.get_auth_info_locked, suppress_errors=True)
+ d.addCallback(_got_response)
+ def _failed(f):
+ self.auth_info = None
+ # do we need to retry?
+ log.err(f)
+ return f
+ d.addErrback(_failed)
+ return d
+
+
+class OpenStackContainer(ContainerRetryMixin, ContainerListMixin):
implements(IContainer)
"""
- I represent a real S3 container (bucket), accessed using the txaws library.
+ I represent a real OpenStack container.
"""
- def __init__(self, access_key, secret_key, url, container_name, usertoken=None, producttoken=None):
- # We only depend on txaws when this class is actually instantiated.
- from txaws.credentials import AWSCredentials
- from txaws.service import AWSServiceEndpoint
- from txaws.s3.client import S3Client, Query
- from txaws.s3.exception import S3Error
-
- creds = AWSCredentials(access_key=access_key, secret_key=secret_key)
- endpoint = AWSServiceEndpoint(uri=url)
-
- query_factory = None
- if usertoken is not None:
- def make_query(*args, **kwargs):
- amz_headers = kwargs.get("amz_headers", {})
- if producttoken is not None:
- amz_headers["security-token"] = (usertoken, producttoken)
- else:
- amz_headers["security-token"] = usertoken
- kwargs["amz_headers"] = amz_headers
-
- return Query(*args, **kwargs)
- query_factory = make_query
-
- self.client = S3Client(creds=creds, endpoint=endpoint, query_factory=query_factory)
- self.container_name = container_name
- self.ServiceError = S3Error
+ def __init__(self, auth_client):
+ self.auth_client = auth_client
+
+ #self.client = OpenStackClient(auth_client)
+ #self.ServiceError = OpenStackError
def __repr__(self):
- return ("<%s %r>" % (self.__class__.__name__, self.container_name,))
+ return ("<%s>" % (self.__class__.__name__,))
def create(self):
return self._do_request('create bucket', self.client.create, self.container_name)