From 81393b149a3bc1b8b8560201bc73a1ba71559eeb Mon Sep 17 00:00:00 2001 From: David-Sarah Hopwood <david-sarah@jacaranda.org> Date: Thu, 21 Feb 2013 22:04:16 +0000 Subject: [PATCH] Cloud backend: move potentially reusable HTTP request utilities to cloud_common. Signed-off-by: David-Sarah Hopwood <david-sarah@jacaranda.org> --- .../storage/backends/cloud/cloud_common.py | 87 +++++++++++++ .../cloud/openstack/openstack_container.py | 117 ++++-------------- 2 files changed, 112 insertions(+), 92 deletions(-) diff --git a/src/allmydata/storage/backends/cloud/cloud_common.py b/src/allmydata/storage/backends/cloud/cloud_common.py index 3d38f230..0a8b12be 100644 --- a/src/allmydata/storage/backends/cloud/cloud_common.py +++ b/src/allmydata/storage/backends/cloud/cloud_common.py @@ -1,9 +1,13 @@ from collections import deque +from cStringIO import StringIO from twisted.internet import defer, reactor, task from twisted.python.failure import Failure from twisted.web.error import Error +from twisted.web.client import FileBodyProducer, ResponseDone +from twisted.web.http_headers import Headers +from twisted.internet.protocol import Protocol from zope.interface import Interface, implements from allmydata.interfaces import IShareBase @@ -601,3 +605,86 @@ class ChunkCache(object): def close(self): self._cachemap = None return self._pipeline.close() + + +class Discard(Protocol): + # see http://twistedmatrix.com/trac/ticket/5488 + def makeConnection(self, producer): + producer.stopProducing() + + +class DataCollector(Protocol): + def __init__(self, ServiceError): + self._data = deque() + self._done = defer.Deferred() + self.ServiceError = ServiceError + + def dataReceived(self, bytes): + self._data.append(bytes) + + def connectionLost(self, reason): + if reason.check(ResponseDone): + eventually_callback(self._done)("".join(self._data)) + else: + def _failed(): raise self.ServiceError(None, 0, message=reason.getErrorMessage()) + eventually_errback(self._done)(defer.execute(_failed)) + + def when_done(self): + """CAUTION: this always returns the same Deferred.""" + return self._done + + +class HTTPClientMixin: + """ + I implement helper methods for making HTTP requests and getting response headers. + + Subclasses should define: + _agent: + The instance of twisted.web.client.Agent to be used. + USER_AGENT: + User agent string. + ServiceError: + The error class to trap (CloudServiceError or similar). + """ + def _http_request(self, what, method, url, request_headers, body=None, need_response_body=False): + # Agent.request adds a Host header automatically based on the URL. + request_headers['User-Agent'] = [self.USER_AGENT] + + if body is None: + bodyProducer = None + else: + bodyProducer = FileBodyProducer(StringIO(body)) + # We don't need to explicitly set Content-Length because FileBodyProducer knows the length + # (and if we do it won't work, because in that case Content-Length would be duplicated). + + log.msg(format="%(what)s request: %(method)s %(url)s %(header_keys)s", + what=what, method=method, url=url, header_keys=repr(request_headers.keys()), level=log.OPERATIONAL) + + d = defer.maybeDeferred(self._agent.request, method, url, Headers(request_headers), bodyProducer) + + def _got_response(response): + log.msg(format="%(what)s response: %(code)d %(phrase)s", + what=what, code=response.code, phrase=response.phrase, level=log.OPERATIONAL) + + if response.code < 200 or response.code >= 300: + raise self.ServiceError(None, response.code, + message="unexpected response code %r %s" % (response.code, response.phrase)) + + if need_response_body: + collector = DataCollector(self.ServiceError) + response.deliverBody(collector) + d2 = collector.when_done() + d2.addCallback(lambda body: (response, body)) + return d2 + else: + response.deliverBody(Discard()) + return (response, None) + d.addCallback(_got_response) + return d + + def _get_header(self, response, name): + hs = response.headers.getRawHeaders(name) + if len(hs) == 0: + raise self.ServiceError(None, response.code, + message="missing response header %r" % (name,)) + return hs[0] diff --git a/src/allmydata/storage/backends/cloud/openstack/openstack_container.py b/src/allmydata/storage/backends/cloud/openstack/openstack_container.py index 7aedf175..617ed163 100644 --- a/src/allmydata/storage/backends/cloud/openstack/openstack_container.py +++ b/src/allmydata/storage/backends/cloud/openstack/openstack_container.py @@ -1,14 +1,9 @@ import urllib, simplejson -from cStringIO import StringIO -from collections import deque from twisted.internet import defer, reactor -from allmydata.util.deferredutil import eventually_callback, eventually_errback -from twisted.internet.protocol import Protocol -from twisted.web.client import Agent, FileBodyProducer, ResponseDone -from twisted.web.http_headers import Headers +from twisted.web.client import Agent from twisted.web.http import UNAUTHORIZED from zope.interface import implements, Interface @@ -16,7 +11,8 @@ from zope.interface import implements, Interface from allmydata.util import log from allmydata.node import InvalidValueError from allmydata.storage.backends.cloud.cloud_common import IContainer, \ - CloudServiceError, ContainerItem, ContainerListing, ContainerRetryMixin + CloudServiceError, ContainerItem, ContainerListing, ContainerRetryMixin, \ + HTTPClientMixin # Enabling this will cause secrets to be logged. @@ -57,55 +53,11 @@ class AuthenticationInfo(object): self.internal_storage_url = internal_storage_url -def _http_request(what, agent, method, url, request_headers, body=None, need_response_body=False): - # Agent.request adds a Host header automatically based on the URL. - request_headers['User-Agent'] = [USER_AGENT] - - if body is None: - bodyProducer = None - else: - bodyProducer = FileBodyProducer(StringIO(body)) - # We don't need to explicitly set Content-Length because FileBodyProducer knows the length - # (and if we do it won't work, because in that case Content-Length would be duplicated). - - log.msg(format="OpenStack %(what)s request %(method)s %(url)s %(header_keys)s", - what=what, method=method, url=url, header_keys=repr(request_headers.keys()), level=log.OPERATIONAL) - - d = defer.maybeDeferred(agent.request, method, url, Headers(request_headers), bodyProducer) - - def _got_response(response): - log.msg(format="OpenStack %(what)s response: %(code)d %(phrase)s", - what=what, code=response.code, phrase=response.phrase, level=log.OPERATIONAL) - - if response.code < 200 or response.code >= 300: - raise CloudServiceError(None, response.code, - message="unexpected response code %r %s" % (response.code, response.phrase)) - - if need_response_body: - collector = DataCollector() - response.deliverBody(collector) - d2 = collector.when_done() - d2.addCallback(lambda body: (response, body)) - return d2 - else: - response.deliverBody(Discard()) - return (response, None) - d.addCallback(_got_response) - return d - -def _get_header(response, name): - hs = response.headers.getRawHeaders(name) - if len(hs) == 0: - raise CloudServiceError(None, response.code, - message="missing response header %r" % (name,)) - return hs[0] - - class IAuthenticator(Interface): def make_auth_request(): """Returns (method, url, headers, body, need_response_body).""" - def parse_auth_response(response, body): + def parse_auth_response(response, get_header, body): """Returns AuthenticationInfo.""" @@ -128,10 +80,10 @@ class AuthenticatorV1(object): } return ('GET', self._auth_service_url, request_headers, None, False) - def parse_auth_response(self, response, body): - auth_token = _get_header(response, 'X-Auth-Token') - storage_url = _get_header(response, 'X-Storage-Url') - #cdn_management_url = _get_header(response, 'X-CDN-Management-Url') + def parse_auth_response(self, response, get_header, body): + auth_token = get_header(response, 'X-Auth-Token') + storage_url = get_header(response, 'X-Storage-Url') + #cdn_management_url = get_header(response, 'X-CDN-Management-Url') return AuthenticationInfo(auth_token, storage_url) @@ -168,7 +120,7 @@ class AuthenticatorV2(object): } return ('POST', self._auth_service_url, request_headers, json, True) - def parse_auth_response(self, response, body): + def parse_auth_response(self, response, get_header, body): try: decoded_body = simplejson.loads(body) except simplejson.decoder.JSONDecodeError, e: @@ -201,18 +153,21 @@ class AuthenticatorV2(object): message="could not find a suitable storage endpoint in auth response") -class AuthenticationClient(object): +class AuthenticationClient(HTTPClientMixin): """ I implement a generic authentication client. The construction of the auth request and parsing of the response is delegated to an authenticator. """ + USER_AGENT = "Tahoe-LAFS OpenStack authentication client" + def __init__(self, authenticator, reauth_period, override_reactor=None): self._authenticator = authenticator self._reauth_period = reauth_period self._reactor = override_reactor or reactor self._agent = Agent(self._reactor) self._shutdown = False + self.ServiceError = CloudServiceError # Not authorized yet. self._auth_info = None @@ -238,9 +193,9 @@ class AuthenticationClient(object): def _authenticate(self): (method, url, request_headers, body, need_response_body) = self._authenticator.make_auth_request() - d = _http_request("auth", self._agent, method, url, request_headers, body, need_response_body) + d = self._http_request("OpenStack auth", method, url, request_headers, body, need_response_body) def _got_response( (response, body) ): - self._auth_info = self._authenticator.parse_auth_response(response, body) + self._auth_info = self._authenticator.parse_auth_response(response, self._get_header, body) if UNSAFE_DEBUG: print "Auth response is %s %s" % (self._auth_info.auth_token, self._auth_info.public_storage_url) @@ -270,35 +225,11 @@ class AuthenticationClient(object): self._delayed.cancel() -class Discard(Protocol): - # see http://twistedmatrix.com/trac/ticket/5488 - def makeConnection(self, producer): - producer.stopProducing() - - -class DataCollector(Protocol): - def __init__(self): - self._data = deque() - self._done = defer.Deferred() - - def dataReceived(self, bytes): - self._data.append(bytes) - - def connectionLost(self, reason): - if reason.check(ResponseDone): - eventually_callback(self._done)("".join(self._data)) - else: - def _failed(): raise CloudServiceError(None, 0, message=reason.getErrorMessage()) - eventually_errback(self._done)(defer.execute(_failed)) - - def when_done(self): - """CAUTION: this always returns the same Deferred.""" - return self._done - - -class OpenStackContainer(ContainerRetryMixin): +class OpenStackContainer(HTTPClientMixin, ContainerRetryMixin): implements(IContainer) + USER_AGENT = "Tahoe-LAFS OpenStack storage client" + def __init__(self, auth_client, container_name, override_reactor=None): self._auth_client = auth_client self._container_name = container_name @@ -351,7 +282,8 @@ class OpenStackContainer(ContainerRetryMixin): url = self._make_container_url(auth_info) if prefix: url += "?format=json&prefix=%s" % (urllib.quote(prefix, safe=''),) - return _http_request("list objects", self._agent, 'GET', url, request_headers, None, need_response_body=True) + return self._http_request("OpenStack list objects", 'GET', url, request_headers, + need_response_body=True) d.addCallback(_do_list) d.addCallback(lambda (response, json): self._parse_list(response, json, prefix)) return d @@ -394,7 +326,7 @@ class OpenStackContainer(ContainerRetryMixin): 'Content-Type': [content_type], } url = self._make_object_url(auth_info, object_name) - return _http_request("put object", self._agent, 'PUT', url, request_headers, data) + return self._http_request("OpenStack put object", 'PUT', url, request_headers, data) d.addCallback(_do_put) d.addCallback(lambda ign: None) return d @@ -409,7 +341,8 @@ class OpenStackContainer(ContainerRetryMixin): 'X-Auth-Token': [auth_info.auth_token], } url = self._make_object_url(auth_info, object_name) - return _http_request("get object", self._agent, 'GET', url, request_headers, need_response_body=True) + return self._http_request("OpenStack get object", 'GET', url, request_headers, + need_response_body=True) d.addCallback(_do_get) d.addCallback(lambda (response, body): body) return d @@ -424,7 +357,7 @@ class OpenStackContainer(ContainerRetryMixin): 'X-Auth-Token': [auth_info.auth_token], } url = self._make_object_url(auth_info, object_name) - return _http_request("head object", self._agent, 'HEAD', url, request_headers) + return self._http_request("OpenStack head object", 'HEAD', url, request_headers) d.addCallback(_do_head) def _got_head_response( (response, body) ): print response @@ -443,7 +376,7 @@ class OpenStackContainer(ContainerRetryMixin): 'X-Auth-Token': [auth_info.auth_token], } url = self._make_object_url(auth_info, object_name) - return _http_request("delete", self._agent, 'DELETE', url, request_headers) + return self._http_request("OpenStack delete object", 'DELETE', url, request_headers) d.addCallback(_do_delete) d.addCallback(lambda ign: None) return d -- 2.45.2