From 81393b149a3bc1b8b8560201bc73a1ba71559eeb Mon Sep 17 00:00:00 2001
From: David-Sarah Hopwood <david-sarah@jacaranda.org>
Date: Thu, 21 Feb 2013 22:04:16 +0000
Subject: [PATCH] Cloud backend: move potentially reusable HTTP request
 utilities to cloud_common.

Signed-off-by: David-Sarah Hopwood <david-sarah@jacaranda.org>
---
 .../storage/backends/cloud/cloud_common.py    |  87 +++++++++++++
 .../cloud/openstack/openstack_container.py    | 117 ++++--------------
 2 files changed, 112 insertions(+), 92 deletions(-)

diff --git a/src/allmydata/storage/backends/cloud/cloud_common.py b/src/allmydata/storage/backends/cloud/cloud_common.py
index 3d38f230..0a8b12be 100644
--- a/src/allmydata/storage/backends/cloud/cloud_common.py
+++ b/src/allmydata/storage/backends/cloud/cloud_common.py
@@ -1,9 +1,13 @@
 
 from collections import deque
+from cStringIO import StringIO
 
 from twisted.internet import defer, reactor, task
 from twisted.python.failure import Failure
 from twisted.web.error import Error
+from twisted.web.client import FileBodyProducer, ResponseDone
+from twisted.web.http_headers import Headers
+from twisted.internet.protocol import Protocol
 
 from zope.interface import Interface, implements
 from allmydata.interfaces import IShareBase
@@ -601,3 +605,86 @@ class ChunkCache(object):
     def close(self):
         self._cachemap = None
         return self._pipeline.close()
+
+
+class Discard(Protocol):
+    # see http://twistedmatrix.com/trac/ticket/5488
+    def makeConnection(self, producer):
+        producer.stopProducing()
+
+
+class DataCollector(Protocol):
+    def __init__(self, ServiceError):
+        self._data = deque()
+        self._done = defer.Deferred()
+        self.ServiceError = ServiceError
+
+    def dataReceived(self, bytes):
+        self._data.append(bytes)
+
+    def connectionLost(self, reason):
+        if reason.check(ResponseDone):
+            eventually_callback(self._done)("".join(self._data))
+        else:
+            def _failed(): raise self.ServiceError(None, 0, message=reason.getErrorMessage())
+            eventually_errback(self._done)(defer.execute(_failed))
+
+    def when_done(self):
+        """CAUTION: this always returns the same Deferred."""
+        return self._done
+
+
+class HTTPClientMixin:
+    """
+    I implement helper methods for making HTTP requests and getting response headers.
+
+    Subclasses should define:
+      _agent:
+          The instance of twisted.web.client.Agent to be used.
+      USER_AGENT:
+          User agent string.
+      ServiceError:
+          The error class to trap (CloudServiceError or similar).
+    """
+    def _http_request(self, what, method, url, request_headers, body=None, need_response_body=False):
+        # Agent.request adds a Host header automatically based on the URL.
+        request_headers['User-Agent'] = [self.USER_AGENT]
+
+        if body is None:
+            bodyProducer = None
+        else:
+            bodyProducer = FileBodyProducer(StringIO(body))
+            # We don't need to explicitly set Content-Length because FileBodyProducer knows the length
+            # (and if we do it won't work, because in that case Content-Length would be duplicated).
+
+        log.msg(format="%(what)s request: %(method)s %(url)s %(header_keys)s",
+                what=what, method=method, url=url, header_keys=repr(request_headers.keys()), level=log.OPERATIONAL)
+
+        d = defer.maybeDeferred(self._agent.request, method, url, Headers(request_headers), bodyProducer)
+
+        def _got_response(response):
+            log.msg(format="%(what)s response: %(code)d %(phrase)s",
+                    what=what, code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
+
+            if response.code < 200 or response.code >= 300:
+                raise self.ServiceError(None, response.code,
+                                        message="unexpected response code %r %s" % (response.code, response.phrase))
+
+            if need_response_body:
+                collector = DataCollector(self.ServiceError)
+                response.deliverBody(collector)
+                d2 = collector.when_done()
+                d2.addCallback(lambda body: (response, body))
+                return d2
+            else:
+                response.deliverBody(Discard())
+                return (response, None)
+        d.addCallback(_got_response)
+        return d
+
+    def _get_header(self, response, name):
+        hs = response.headers.getRawHeaders(name)
+        if len(hs) == 0:
+            raise self.ServiceError(None, response.code,
+                                    message="missing response header %r" % (name,))
+        return hs[0]
diff --git a/src/allmydata/storage/backends/cloud/openstack/openstack_container.py b/src/allmydata/storage/backends/cloud/openstack/openstack_container.py
index 7aedf175..617ed163 100644
--- a/src/allmydata/storage/backends/cloud/openstack/openstack_container.py
+++ b/src/allmydata/storage/backends/cloud/openstack/openstack_container.py
@@ -1,14 +1,9 @@
 
 import urllib, simplejson
-from cStringIO import StringIO
-from collections import deque
 
 from twisted.internet import defer, reactor
-from allmydata.util.deferredutil import eventually_callback, eventually_errback
 
-from twisted.internet.protocol import Protocol
-from twisted.web.client import Agent, FileBodyProducer, ResponseDone
-from twisted.web.http_headers import Headers
+from twisted.web.client import Agent
 from twisted.web.http import UNAUTHORIZED
 
 from zope.interface import implements, Interface
@@ -16,7 +11,8 @@ from zope.interface import implements, Interface
 from allmydata.util import log
 from allmydata.node import InvalidValueError
 from allmydata.storage.backends.cloud.cloud_common import IContainer, \
-     CloudServiceError, ContainerItem, ContainerListing, ContainerRetryMixin
+     CloudServiceError, ContainerItem, ContainerListing, ContainerRetryMixin, \
+     HTTPClientMixin
 
 
 # Enabling this will cause secrets to be logged.
@@ -57,55 +53,11 @@ class AuthenticationInfo(object):
         self.internal_storage_url = internal_storage_url
 
 
-def _http_request(what, agent, method, url, request_headers, body=None, need_response_body=False):
-    # Agent.request adds a Host header automatically based on the URL.
-    request_headers['User-Agent'] = [USER_AGENT]
-
-    if body is None:
-        bodyProducer = None
-    else:
-        bodyProducer = FileBodyProducer(StringIO(body))
-        # We don't need to explicitly set Content-Length because FileBodyProducer knows the length
-        # (and if we do it won't work, because in that case Content-Length would be duplicated).
-
-    log.msg(format="OpenStack %(what)s request %(method)s %(url)s %(header_keys)s",
-            what=what, method=method, url=url, header_keys=repr(request_headers.keys()), level=log.OPERATIONAL)
-
-    d = defer.maybeDeferred(agent.request, method, url, Headers(request_headers), bodyProducer)
-
-    def _got_response(response):
-        log.msg(format="OpenStack %(what)s response: %(code)d %(phrase)s",
-                what=what, code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
-
-        if response.code < 200 or response.code >= 300:
-            raise CloudServiceError(None, response.code,
-                                    message="unexpected response code %r %s" % (response.code, response.phrase))
-
-        if need_response_body:
-            collector = DataCollector()
-            response.deliverBody(collector)
-            d2 = collector.when_done()
-            d2.addCallback(lambda body: (response, body))
-            return d2
-        else:
-            response.deliverBody(Discard())
-            return (response, None)
-    d.addCallback(_got_response)
-    return d
-
-def _get_header(response, name):
-    hs = response.headers.getRawHeaders(name)
-    if len(hs) == 0:
-        raise CloudServiceError(None, response.code,
-                                message="missing response header %r" % (name,))
-    return hs[0]
-
-
 class IAuthenticator(Interface):
     def make_auth_request():
         """Returns (method, url, headers, body, need_response_body)."""
 
-    def parse_auth_response(response, body):
+    def parse_auth_response(response, get_header, body):
         """Returns AuthenticationInfo."""
 
 
@@ -128,10 +80,10 @@ class AuthenticatorV1(object):
         }
         return ('GET', self._auth_service_url, request_headers, None, False)
 
-    def parse_auth_response(self, response, body):
-        auth_token = _get_header(response, 'X-Auth-Token')
-        storage_url = _get_header(response, 'X-Storage-Url')
-        #cdn_management_url = _get_header(response, 'X-CDN-Management-Url')
+    def parse_auth_response(self, response, get_header, body):
+        auth_token = get_header(response, 'X-Auth-Token')
+        storage_url = get_header(response, 'X-Storage-Url')
+        #cdn_management_url = get_header(response, 'X-CDN-Management-Url')
         return AuthenticationInfo(auth_token, storage_url)
 
 
@@ -168,7 +120,7 @@ class AuthenticatorV2(object):
         }
         return ('POST', self._auth_service_url, request_headers, json, True)
 
-    def parse_auth_response(self, response, body):
+    def parse_auth_response(self, response, get_header, body):
         try:
             decoded_body = simplejson.loads(body)
         except simplejson.decoder.JSONDecodeError, e:
@@ -201,18 +153,21 @@ class AuthenticatorV2(object):
                                 message="could not find a suitable storage endpoint in auth response")
 
 
-class AuthenticationClient(object):
+class AuthenticationClient(HTTPClientMixin):
     """
     I implement a generic authentication client.
     The construction of the auth request and parsing of the response is delegated to an authenticator.
     """
 
+    USER_AGENT = "Tahoe-LAFS OpenStack authentication client"
+
     def __init__(self, authenticator, reauth_period, override_reactor=None):
         self._authenticator = authenticator
         self._reauth_period = reauth_period
         self._reactor = override_reactor or reactor
         self._agent = Agent(self._reactor)
         self._shutdown = False
+        self.ServiceError = CloudServiceError
 
         # Not authorized yet.
         self._auth_info = None
@@ -238,9 +193,9 @@ class AuthenticationClient(object):
     def _authenticate(self):
         (method, url, request_headers, body, need_response_body) = self._authenticator.make_auth_request()
 
-        d = _http_request("auth", self._agent, method, url, request_headers, body, need_response_body)
+        d = self._http_request("OpenStack auth", method, url, request_headers, body, need_response_body)
         def _got_response( (response, body) ):
-            self._auth_info = self._authenticator.parse_auth_response(response, body)
+            self._auth_info = self._authenticator.parse_auth_response(response, self._get_header, body)
             if UNSAFE_DEBUG:
                 print "Auth response is %s %s" % (self._auth_info.auth_token, self._auth_info.public_storage_url)
 
@@ -270,35 +225,11 @@ class AuthenticationClient(object):
             self._delayed.cancel()
 
 
-class Discard(Protocol):
-    # see http://twistedmatrix.com/trac/ticket/5488
-    def makeConnection(self, producer):
-        producer.stopProducing()
-
-
-class DataCollector(Protocol):
-    def __init__(self):
-        self._data = deque()
-        self._done = defer.Deferred()
-
-    def dataReceived(self, bytes):
-        self._data.append(bytes)
-
-    def connectionLost(self, reason):
-        if reason.check(ResponseDone):
-            eventually_callback(self._done)("".join(self._data))
-        else:
-            def _failed(): raise CloudServiceError(None, 0, message=reason.getErrorMessage())
-            eventually_errback(self._done)(defer.execute(_failed))
-
-    def when_done(self):
-        """CAUTION: this always returns the same Deferred."""
-        return self._done
-
-
-class OpenStackContainer(ContainerRetryMixin):
+class OpenStackContainer(HTTPClientMixin, ContainerRetryMixin):
     implements(IContainer)
 
+    USER_AGENT = "Tahoe-LAFS OpenStack storage client"
+
     def __init__(self, auth_client, container_name, override_reactor=None):
         self._auth_client = auth_client
         self._container_name = container_name
@@ -351,7 +282,8 @@ class OpenStackContainer(ContainerRetryMixin):
             url = self._make_container_url(auth_info)
             if prefix:
                 url += "?format=json&prefix=%s" % (urllib.quote(prefix, safe=''),)
-            return _http_request("list objects", self._agent, 'GET', url, request_headers, None, need_response_body=True)
+            return self._http_request("OpenStack list objects", 'GET', url, request_headers,
+                                      need_response_body=True)
         d.addCallback(_do_list)
         d.addCallback(lambda (response, json): self._parse_list(response, json, prefix))
         return d
@@ -394,7 +326,7 @@ class OpenStackContainer(ContainerRetryMixin):
                 'Content-Type': [content_type],
             }
             url = self._make_object_url(auth_info, object_name)
-            return _http_request("put object", self._agent, 'PUT', url, request_headers, data)
+            return self._http_request("OpenStack put object", 'PUT', url, request_headers, data)
         d.addCallback(_do_put)
         d.addCallback(lambda ign: None)
         return d
@@ -409,7 +341,8 @@ class OpenStackContainer(ContainerRetryMixin):
                 'X-Auth-Token': [auth_info.auth_token],
             }
             url = self._make_object_url(auth_info, object_name)
-            return _http_request("get object", self._agent, 'GET', url, request_headers, need_response_body=True)
+            return self._http_request("OpenStack get object", 'GET', url, request_headers,
+                                      need_response_body=True)
         d.addCallback(_do_get)
         d.addCallback(lambda (response, body): body)
         return d
@@ -424,7 +357,7 @@ class OpenStackContainer(ContainerRetryMixin):
                 'X-Auth-Token': [auth_info.auth_token],
             }
             url = self._make_object_url(auth_info, object_name)
-            return _http_request("head object", self._agent, 'HEAD', url, request_headers)
+            return self._http_request("OpenStack head object", 'HEAD', url, request_headers)
         d.addCallback(_do_head)
         def _got_head_response( (response, body) ):
             print response
@@ -443,7 +376,7 @@ class OpenStackContainer(ContainerRetryMixin):
                 'X-Auth-Token': [auth_info.auth_token],
             }
             url = self._make_object_url(auth_info, object_name)
-            return _http_request("delete", self._agent, 'DELETE', url, request_headers)
+            return self._http_request("OpenStack delete object", 'DELETE', url, request_headers)
         d.addCallback(_do_delete)
         d.addCallback(lambda ign: None)
         return d
-- 
2.45.2