From: David-Sarah Hopwood <david-sarah@jacaranda.org>
Date: Wed, 13 Feb 2013 22:08:46 +0000 (+0000)
Subject: OpenStack: mostly complete implementation of OpenStackContainer.
X-Git-Url: https://git.rkrishnan.org/simplejson/components/%22file:/something?a=commitdiff_plain;h=7823ac35f7439d4bbed9ce74308b3525d1bba40c;p=tahoe-lafs%2Ftahoe-lafs.git

OpenStack: mostly complete implementation of OpenStackContainer.

Signed-off-by: David-Sarah Hopwood <david-sarah@jacaranda.org>
---

diff --git a/src/allmydata/storage/backends/cloud/openstack/openstack_container.py b/src/allmydata/storage/backends/cloud/openstack/openstack_container.py
index 8f978814..8b9972bd 100644
--- a/src/allmydata/storage/backends/cloud/openstack/openstack_container.py
+++ b/src/allmydata/storage/backends/cloud/openstack/openstack_container.py
@@ -1,6 +1,13 @@
 
+import urllib, simplejson
+from cStringIO import StringIO
+from collections import deque
+
 from twisted.internet import defer, reactor
-from twisted.web.client import Agent
+from allmydata.util.deferredutil import eventually_callback, eventually_errback
+
+from twisted.internet.protocol import Protocol
+from twisted.web.client import Agent, FileBodyProducer, ResponseDone
 from twisted.web.http_headers import Headers
 
 from zope.interface import implements
@@ -9,7 +16,10 @@ from allmydata.util import log
 from allmydata.util.assertutil import _assert
 from allmydata.node import InvalidValueError
 from allmydata.storage.backends.cloud.cloud_common import IContainer, \
-     ContainerRetryMixin, ContainerListMixin
+     ContainerRetryMixin
+
+# move this
+from allmydata.storage.backends.cloud.mock_cloud import ContainerItem, ContainerListing
 
 
 # Enabling this will cause secrets to be logged.
@@ -145,61 +155,229 @@ class AuthenticationClient(object):
             self._delayed.cancel()
 
 
-class OpenStackContainer(ContainerRetryMixin, ContainerListMixin):
+class OpenStackError(Exception):
+    pass
+
+
+class Discard(Protocol):
+    # see http://twistedmatrix.com/trac/ticket/5488
+    def makeConnection(self, producer):
+        producer.stopProducing()
+
+
+class DataCollector(Protocol):
+    def __init__(self):
+        self._data = deque()
+        self._done = defer.Deferred()
+
+    def dataReceived(self, bytes):
+        print 'Got %d bytes' % (len(bytes),)
+        self._data.append(bytes)
+
+    def connectionLost(self, reason):
+        print 'Finished receiving body: %s' % (reason.getErrorMessage(),)
+        #reason.raiseException()
+        if reason.check(ResponseDone):
+            eventually_callback(self._done)("".join(self._data))
+        else:
+            def _failed(): raise OpenStackError(reason.getErrorMessage())
+            eventually_errback(self._done)(defer.execute(_failed))
+
+    def when_done(self):
+        """CAUTION: this always returns the same Deferred."""
+        return self._done
+
+
+class OpenStackContainer(ContainerRetryMixin):
     implements(IContainer)
-    """
-    I represent a real OpenStack container.
-    """
 
-    def __init__(self, auth_client, container_name):
+    USER_AGENT = 'Tahoe-LAFS OpenStack client'
+
+    def __init__(self, auth_client, container_name, override_reactor=None):
         self._auth_client = auth_client
         self._container_name = container_name
-
-        #self.client = OpenStackClient(auth_client)
-        #self.ServiceError = OpenStackError
+        self._reactor = override_reactor or reactor
+        self._agent = Agent(self._reactor)
+        self.ServiceError = OpenStackError
 
     def __repr__(self):
-        return ("<%s>" % (self.__class__.__name__,))
+        return ("<%s %r>" % (self.__class__.__name__, self._container_name,))
+
+    def _make_container_url(self, auth_info):
+        return "%s/%s" % (auth_info.storage_url, urllib.quote(self._container_name, safe=''))
+
+    def _make_object_url(self, auth_info, object_name):
+        return "%s/%s/%s" % (auth_info.storage_url, urllib.quote(self._container_name, safe=''), urllib.quote(object_name))
+
+    def _create(self):
+        """
+        Create this container.
+        """
+        raise NotImplementedError
+
+    def _delete(self):
+        """
+        Delete this container.
+        The cloud service may require the container to be empty before it can be deleted.
+        """
+        raise NotImplementedError
+
+    def _list_objects(self, prefix=''):
+        """
+        Get a ContainerListing that lists objects in this container.
+
+        prefix: (str) limit the returned keys to those starting with prefix.
+        """
+        d = self._auth_client.get_auth_info()
+        def _do_list(auth_info):
+            request_headers = {
+                'User-Agent': [self.USER_AGENT],
+                'X-Auth-Token': [auth_info.auth_token],
+            }
+            url = self._make_container_url(auth_info)
+            if prefix:
+                url += "?format=json&prefix=%s" % (urllib.quote(prefix, safe=''),)
+            log.msg(format="OpenStack list GET %(url)s", url=url, level=log.OPERATIONAL)
+            return self._agent.request('GET', url, Headers(request_headers), None)
+        d.addCallback(_do_list)
+        def _got_list_response(response):
+            log.msg(format="OpenStack list GET response: %(code)d %(phrase)s",
+                    code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
+            if response.code < 200 or response.code >= 300:
+                raise OpenStackError("unexpected response code %r %s" % (response.code, response.phrase),
+                                     response.code, response.headers)
+
+            collector = DataCollector()
+            response.deliverBody(collector)
+            return collector.when_done()
+        d.addCallback(_got_list_response)
+        def _parse_list(json):
+            items = simplejson.loads(json)
+            log.msg(format="OpenStack list GET read %(length)d bytes, parsed as %(items)d items",
+                    length=len(json), items=len(items), level=log.OPERATIONAL)
+
+            def _make_containeritem(item):
+                try:
+                    key = item['name']
+                    size = item['bytes']
+                    modification_date = item['last_modified']
+                    etag = item['hash']
+                    storage_class = 'STANDARD'
+                except KeyError, e:
+                    raise OpenStackError(str(e))
+                else:
+                    return ContainerItem(key, modification_date, etag, size, storage_class)
+
+            contents = map(_make_containeritem, items)
+            return ContainerListing(self._container_name, prefix, None, 10000, False, contents=contents)
+        d.addCallback(_parse_list)
+        return d
+
+    def _put_object(self, object_name, data, content_type='application/octet-stream', metadata={}):
+        """
+        Put an object in this bucket.
+        Any existing object of the same name will be replaced.
+        """
+        d = self._auth_client.get_auth_info()
+        def _do_put(auth_info):
+            content_length = len(data)
+            request_headers = {
+                'User-Agent': [self.USER_AGENT],
+                'X-Auth-Token': [auth_info.auth_token],
+                'Content-Type': [content_type],
+                'Content-Length': [content_length],
+            }
+            producer = FileBodyProducer(StringIO(data))
+            url = self._make_object_url(auth_info, object_name)
+            log.msg(format="OpenStack PUT %(url)s %(content_length)d",
+                    url=url, content_length=content_length, level=log.OPERATIONAL)
+            return self._agent.request('PUT', url, Headers(request_headers), producer)
+        d.addCallback(_do_put)
+        def _got_put_response(response):
+            log.msg(format="OpenStack PUT response: %(code)d %(phrase)s",
+                    code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
+            if response.code < 200 or response.code >= 300:
+                raise OpenStackError("unexpected response code %r %s" % (response.code, response.phrase),
+                                     response.code, response.headers)
+            response.deliverBody(Discard())
+        d.addCallback(_got_put_response)
+        return d
+
+    def _get_object(self, object_name):
+        """
+        Get an object from this container.
+        """
+        d = self._auth_client.get_auth_info()
+        def _do_get(auth_info):
+            request_headers = {
+                'User-Agent': [self.USER_AGENT],
+                'X-Auth-Token': [auth_info.auth_token],
+            }
+            url = self._make_object_url(auth_info, object_name)
+            log.msg(format="OpenStack GET %(url)s", url=url, level=log.OPERATIONAL)
+            return self._agent.request('GET', url, Headers(request_headers), None)
+        d.addCallback(_do_get)
+        def _got_get_response(response):
+            log.msg(format="OpenStack GET response: %(code)d %(phrase)s",
+                    code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
+            if response.code < 200 or response.code >= 300:
+                raise OpenStackError("unexpected response code %r %s" % (response.code, response.phrase),
+                                     response.code, response.headers)
+
+            collector = DataCollector()
+            response.deliverBody(collector)
+            return collector.when_done()
+        d.addCallback(_got_get_response)
+        return d
+
+    def _head_object(self, object_name):
+        """
+        Retrieve object metadata only.
+        """
+        raise NotImplementedError
+
+    def _delete_object(self, object_name):
+        """
+        Delete an object from this container.
+        Once deleted, there is no method to restore or undelete an object.
+        """
+        d = self._auth_client.get_auth_info()
+        def _do_delete(auth_info):
+            request_headers = {
+                'User-Agent': [self.USER_AGENT],
+                'X-Auth-Token': [auth_info.auth_token],
+            }
+            url = self._make_object_url(auth_info, object_name)
+            log.msg(format="OpenStack DELETE %(url)s", url=url, level=log.OPERATIONAL)
+            return self._agent.request('DELETE', url, Headers(request_headers), None)
+        d.addCallback(_do_delete)
+        def _got_delete_response(response):
+            log.msg(format="OpenStack DELETE response: %(code)d %(phrase)s",
+                    code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
+            if response.code < 200 or response.code >= 300:
+                raise OpenStackError("unexpected response code %r %s" % (response.code, response.phrase),
+                                     response.code, response.headers)
+            response.deliverBody(Discard())
+        d.addCallback(_got_delete_response)
+        return d
 
     def create(self):
-        return self._do_request('create bucket', self.client.create, self.container_name)
+        return self._do_request('create container', self._create)
 
     def delete(self):
-        return self._do_request('delete bucket', self.client.delete, self.container_name)
+        return self._do_request('delete container', self._delete)
 
-    def list_some_objects(self, **kwargs):
-        return self._do_request('list objects', self.client.get_bucket, self.container_name, **kwargs)
+    def list_objects(self, prefix=''):
+        return self._do_request('list objects', self._list_objects, prefix)
 
     def put_object(self, object_name, data, content_type='application/octet-stream', metadata={}):
-        return self._do_request('PUT object', self.client.put_object, self.container_name,
-                                object_name, data, content_type, metadata)
+        return self._do_request('PUT object', self._put_object, object_name, data, content_type, metadata)
 
     def get_object(self, object_name):
-        return self._do_request('GET object', self.client.get_object, self.container_name, object_name)
+        return self._do_request('GET object', self._get_object, object_name)
 
     def head_object(self, object_name):
-        return self._do_request('HEAD object', self.client.head_object, self.container_name, object_name)
+        return self._do_request('HEAD object', self._head_object, object_name)
 
     def delete_object(self, object_name):
-        return self._do_request('DELETE object', self.client.delete_object, self.container_name, object_name)
-
-    def put_policy(self, policy):
-        """
-        Set access control policy on a bucket.
-        """
-        query = self.client.query_factory(
-            action='PUT', creds=self.client.creds, endpoint=self.client.endpoint,
-            bucket=self.container_name, object_name='?policy', data=policy)
-        return self._do_request('PUT policy', query.submit)
-
-    def get_policy(self):
-        query = self.client.query_factory(
-            action='GET', creds=self.client.creds, endpoint=self.client.endpoint,
-            bucket=self.container_name, object_name='?policy')
-        return self._do_request('GET policy', query.submit)
-
-    def delete_policy(self):
-        query = self.client.query_factory(
-            action='DELETE', creds=self.client.creds, endpoint=self.client.endpoint,
-            bucket=self.container_name, object_name='?policy')
-        return self._do_request('DELETE policy', query.submit)
+        return self._do_request('DELETE object', self._delete_object, object_name)