from collections import deque
+from cStringIO import StringIO
from twisted.internet import defer, reactor, task
from twisted.python.failure import Failure
from twisted.web.error import Error
+from twisted.web.client import FileBodyProducer, ResponseDone
+from twisted.web.http_headers import Headers
+from twisted.internet.protocol import Protocol
from zope.interface import Interface, implements
from allmydata.interfaces import IShareBase
def close(self):
self._cachemap = None
return self._pipeline.close()
+
+
+class Discard(Protocol):
+ # see http://twistedmatrix.com/trac/ticket/5488
+ def makeConnection(self, producer):
+ producer.stopProducing()
+
+
+class DataCollector(Protocol):
+ def __init__(self, ServiceError):
+ self._data = deque()
+ self._done = defer.Deferred()
+ self.ServiceError = ServiceError
+
+ def dataReceived(self, bytes):
+ self._data.append(bytes)
+
+ def connectionLost(self, reason):
+ if reason.check(ResponseDone):
+ eventually_callback(self._done)("".join(self._data))
+ else:
+ def _failed(): raise self.ServiceError(None, 0, message=reason.getErrorMessage())
+ eventually_errback(self._done)(defer.execute(_failed))
+
+ def when_done(self):
+ """CAUTION: this always returns the same Deferred."""
+ return self._done
+
+
+class HTTPClientMixin:
+ """
+ I implement helper methods for making HTTP requests and getting response headers.
+
+ Subclasses should define:
+ _agent:
+ The instance of twisted.web.client.Agent to be used.
+ USER_AGENT:
+ User agent string.
+ ServiceError:
+ The error class to trap (CloudServiceError or similar).
+ """
+ def _http_request(self, what, method, url, request_headers, body=None, need_response_body=False):
+ # Agent.request adds a Host header automatically based on the URL.
+ request_headers['User-Agent'] = [self.USER_AGENT]
+
+ if body is None:
+ bodyProducer = None
+ else:
+ bodyProducer = FileBodyProducer(StringIO(body))
+ # We don't need to explicitly set Content-Length because FileBodyProducer knows the length
+ # (and if we do it won't work, because in that case Content-Length would be duplicated).
+
+ log.msg(format="%(what)s request: %(method)s %(url)s %(header_keys)s",
+ what=what, method=method, url=url, header_keys=repr(request_headers.keys()), level=log.OPERATIONAL)
+
+ d = defer.maybeDeferred(self._agent.request, method, url, Headers(request_headers), bodyProducer)
+
+ def _got_response(response):
+ log.msg(format="%(what)s response: %(code)d %(phrase)s",
+ what=what, code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
+
+ if response.code < 200 or response.code >= 300:
+ raise self.ServiceError(None, response.code,
+ message="unexpected response code %r %s" % (response.code, response.phrase))
+
+ if need_response_body:
+ collector = DataCollector(self.ServiceError)
+ response.deliverBody(collector)
+ d2 = collector.when_done()
+ d2.addCallback(lambda body: (response, body))
+ return d2
+ else:
+ response.deliverBody(Discard())
+ return (response, None)
+ d.addCallback(_got_response)
+ return d
+
+ def _get_header(self, response, name):
+ hs = response.headers.getRawHeaders(name)
+ if len(hs) == 0:
+ raise self.ServiceError(None, response.code,
+ message="missing response header %r" % (name,))
+ return hs[0]
import urllib, simplejson
-from cStringIO import StringIO
-from collections import deque
from twisted.internet import defer, reactor
-from allmydata.util.deferredutil import eventually_callback, eventually_errback
-from twisted.internet.protocol import Protocol
-from twisted.web.client import Agent, FileBodyProducer, ResponseDone
-from twisted.web.http_headers import Headers
+from twisted.web.client import Agent
from twisted.web.http import UNAUTHORIZED
from zope.interface import implements, Interface
from allmydata.util import log
from allmydata.node import InvalidValueError
from allmydata.storage.backends.cloud.cloud_common import IContainer, \
- CloudServiceError, ContainerItem, ContainerListing, ContainerRetryMixin
+ CloudServiceError, ContainerItem, ContainerListing, ContainerRetryMixin, \
+ HTTPClientMixin
# Enabling this will cause secrets to be logged.
self.internal_storage_url = internal_storage_url
-def _http_request(what, agent, method, url, request_headers, body=None, need_response_body=False):
- # Agent.request adds a Host header automatically based on the URL.
- request_headers['User-Agent'] = [USER_AGENT]
-
- if body is None:
- bodyProducer = None
- else:
- bodyProducer = FileBodyProducer(StringIO(body))
- # We don't need to explicitly set Content-Length because FileBodyProducer knows the length
- # (and if we do it won't work, because in that case Content-Length would be duplicated).
-
- log.msg(format="OpenStack %(what)s request %(method)s %(url)s %(header_keys)s",
- what=what, method=method, url=url, header_keys=repr(request_headers.keys()), level=log.OPERATIONAL)
-
- d = defer.maybeDeferred(agent.request, method, url, Headers(request_headers), bodyProducer)
-
- def _got_response(response):
- log.msg(format="OpenStack %(what)s response: %(code)d %(phrase)s",
- what=what, code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
-
- if response.code < 200 or response.code >= 300:
- raise CloudServiceError(None, response.code,
- message="unexpected response code %r %s" % (response.code, response.phrase))
-
- if need_response_body:
- collector = DataCollector()
- response.deliverBody(collector)
- d2 = collector.when_done()
- d2.addCallback(lambda body: (response, body))
- return d2
- else:
- response.deliverBody(Discard())
- return (response, None)
- d.addCallback(_got_response)
- return d
-
-def _get_header(response, name):
- hs = response.headers.getRawHeaders(name)
- if len(hs) == 0:
- raise CloudServiceError(None, response.code,
- message="missing response header %r" % (name,))
- return hs[0]
-
-
class IAuthenticator(Interface):
def make_auth_request():
"""Returns (method, url, headers, body, need_response_body)."""
- def parse_auth_response(response, body):
+ def parse_auth_response(response, get_header, body):
"""Returns AuthenticationInfo."""
}
return ('GET', self._auth_service_url, request_headers, None, False)
- def parse_auth_response(self, response, body):
- auth_token = _get_header(response, 'X-Auth-Token')
- storage_url = _get_header(response, 'X-Storage-Url')
- #cdn_management_url = _get_header(response, 'X-CDN-Management-Url')
+ def parse_auth_response(self, response, get_header, body):
+ auth_token = get_header(response, 'X-Auth-Token')
+ storage_url = get_header(response, 'X-Storage-Url')
+ #cdn_management_url = get_header(response, 'X-CDN-Management-Url')
return AuthenticationInfo(auth_token, storage_url)
}
return ('POST', self._auth_service_url, request_headers, json, True)
- def parse_auth_response(self, response, body):
+ def parse_auth_response(self, response, get_header, body):
try:
decoded_body = simplejson.loads(body)
except simplejson.decoder.JSONDecodeError, e:
message="could not find a suitable storage endpoint in auth response")
-class AuthenticationClient(object):
+class AuthenticationClient(HTTPClientMixin):
"""
I implement a generic authentication client.
The construction of the auth request and parsing of the response is delegated to an authenticator.
"""
+ USER_AGENT = "Tahoe-LAFS OpenStack authentication client"
+
def __init__(self, authenticator, reauth_period, override_reactor=None):
self._authenticator = authenticator
self._reauth_period = reauth_period
self._reactor = override_reactor or reactor
self._agent = Agent(self._reactor)
self._shutdown = False
+ self.ServiceError = CloudServiceError
# Not authorized yet.
self._auth_info = None
def _authenticate(self):
(method, url, request_headers, body, need_response_body) = self._authenticator.make_auth_request()
- d = _http_request("auth", self._agent, method, url, request_headers, body, need_response_body)
+ d = self._http_request("OpenStack auth", method, url, request_headers, body, need_response_body)
def _got_response( (response, body) ):
- self._auth_info = self._authenticator.parse_auth_response(response, body)
+ self._auth_info = self._authenticator.parse_auth_response(response, self._get_header, body)
if UNSAFE_DEBUG:
print "Auth response is %s %s" % (self._auth_info.auth_token, self._auth_info.public_storage_url)
self._delayed.cancel()
-class Discard(Protocol):
- # see http://twistedmatrix.com/trac/ticket/5488
- def makeConnection(self, producer):
- producer.stopProducing()
-
-
-class DataCollector(Protocol):
- def __init__(self):
- self._data = deque()
- self._done = defer.Deferred()
-
- def dataReceived(self, bytes):
- self._data.append(bytes)
-
- def connectionLost(self, reason):
- if reason.check(ResponseDone):
- eventually_callback(self._done)("".join(self._data))
- else:
- def _failed(): raise CloudServiceError(None, 0, message=reason.getErrorMessage())
- eventually_errback(self._done)(defer.execute(_failed))
-
- def when_done(self):
- """CAUTION: this always returns the same Deferred."""
- return self._done
-
-
-class OpenStackContainer(ContainerRetryMixin):
+class OpenStackContainer(HTTPClientMixin, ContainerRetryMixin):
implements(IContainer)
+ USER_AGENT = "Tahoe-LAFS OpenStack storage client"
+
def __init__(self, auth_client, container_name, override_reactor=None):
self._auth_client = auth_client
self._container_name = container_name
url = self._make_container_url(auth_info)
if prefix:
url += "?format=json&prefix=%s" % (urllib.quote(prefix, safe=''),)
- return _http_request("list objects", self._agent, 'GET', url, request_headers, None, need_response_body=True)
+ return self._http_request("OpenStack list objects", 'GET', url, request_headers,
+ need_response_body=True)
d.addCallback(_do_list)
d.addCallback(lambda (response, json): self._parse_list(response, json, prefix))
return d
'Content-Type': [content_type],
}
url = self._make_object_url(auth_info, object_name)
- return _http_request("put object", self._agent, 'PUT', url, request_headers, data)
+ return self._http_request("OpenStack put object", 'PUT', url, request_headers, data)
d.addCallback(_do_put)
d.addCallback(lambda ign: None)
return d
'X-Auth-Token': [auth_info.auth_token],
}
url = self._make_object_url(auth_info, object_name)
- return _http_request("get object", self._agent, 'GET', url, request_headers, need_response_body=True)
+ return self._http_request("OpenStack get object", 'GET', url, request_headers,
+ need_response_body=True)
d.addCallback(_do_get)
d.addCallback(lambda (response, body): body)
return d
'X-Auth-Token': [auth_info.auth_token],
}
url = self._make_object_url(auth_info, object_name)
- return _http_request("head object", self._agent, 'HEAD', url, request_headers)
+ return self._http_request("OpenStack head object", 'HEAD', url, request_headers)
d.addCallback(_do_head)
def _got_head_response( (response, body) ):
print response
'X-Auth-Token': [auth_info.auth_token],
}
url = self._make_object_url(auth_info, object_name)
- return _http_request("delete", self._agent, 'DELETE', url, request_headers)
+ return self._http_request("OpenStack delete object", 'DELETE', url, request_headers)
d.addCallback(_do_delete)
d.addCallback(lambda ign: None)
return d