From: David-Sarah Hopwood Date: Wed, 13 Feb 2013 22:08:46 +0000 (+0000) Subject: OpenStack: mostly complete implementation of OpenStackContainer. X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/uri/flags/module-simplejson.html?a=commitdiff_plain;h=7823ac35f7439d4bbed9ce74308b3525d1bba40c;p=tahoe-lafs%2Ftahoe-lafs.git OpenStack: mostly complete implementation of OpenStackContainer. Signed-off-by: David-Sarah Hopwood --- diff --git a/src/allmydata/storage/backends/cloud/openstack/openstack_container.py b/src/allmydata/storage/backends/cloud/openstack/openstack_container.py index 8f978814..8b9972bd 100644 --- a/src/allmydata/storage/backends/cloud/openstack/openstack_container.py +++ b/src/allmydata/storage/backends/cloud/openstack/openstack_container.py @@ -1,6 +1,13 @@ +import urllib, simplejson +from cStringIO import StringIO +from collections import deque + from twisted.internet import defer, reactor -from twisted.web.client import Agent +from allmydata.util.deferredutil import eventually_callback, eventually_errback + +from twisted.internet.protocol import Protocol +from twisted.web.client import Agent, FileBodyProducer, ResponseDone from twisted.web.http_headers import Headers from zope.interface import implements @@ -9,7 +16,10 @@ from allmydata.util import log from allmydata.util.assertutil import _assert from allmydata.node import InvalidValueError from allmydata.storage.backends.cloud.cloud_common import IContainer, \ - ContainerRetryMixin, ContainerListMixin + ContainerRetryMixin + +# move this +from allmydata.storage.backends.cloud.mock_cloud import ContainerItem, ContainerListing # Enabling this will cause secrets to be logged. @@ -145,61 +155,229 @@ class AuthenticationClient(object): self._delayed.cancel() -class OpenStackContainer(ContainerRetryMixin, ContainerListMixin): +class OpenStackError(Exception): + pass + + +class Discard(Protocol): + # see http://twistedmatrix.com/trac/ticket/5488 + def makeConnection(self, producer): + producer.stopProducing() + + +class DataCollector(Protocol): + def __init__(self): + self._data = deque() + self._done = defer.Deferred() + + def dataReceived(self, bytes): + print 'Got %d bytes' % (len(bytes),) + self._data.append(bytes) + + def connectionLost(self, reason): + print 'Finished receiving body: %s' % (reason.getErrorMessage(),) + #reason.raiseException() + if reason.check(ResponseDone): + eventually_callback(self._done)("".join(self._data)) + else: + def _failed(): raise OpenStackError(reason.getErrorMessage()) + eventually_errback(self._done)(defer.execute(_failed)) + + def when_done(self): + """CAUTION: this always returns the same Deferred.""" + return self._done + + +class OpenStackContainer(ContainerRetryMixin): implements(IContainer) - """ - I represent a real OpenStack container. - """ - def __init__(self, auth_client, container_name): + USER_AGENT = 'Tahoe-LAFS OpenStack client' + + def __init__(self, auth_client, container_name, override_reactor=None): self._auth_client = auth_client self._container_name = container_name - - #self.client = OpenStackClient(auth_client) - #self.ServiceError = OpenStackError + self._reactor = override_reactor or reactor + self._agent = Agent(self._reactor) + self.ServiceError = OpenStackError def __repr__(self): - return ("<%s>" % (self.__class__.__name__,)) + return ("<%s %r>" % (self.__class__.__name__, self._container_name,)) + + def _make_container_url(self, auth_info): + return "%s/%s" % (auth_info.storage_url, urllib.quote(self._container_name, safe='')) + + def _make_object_url(self, auth_info, object_name): + return "%s/%s/%s" % (auth_info.storage_url, urllib.quote(self._container_name, safe=''), urllib.quote(object_name)) + + def _create(self): + """ + Create this container. + """ + raise NotImplementedError + + def _delete(self): + """ + Delete this container. + The cloud service may require the container to be empty before it can be deleted. + """ + raise NotImplementedError + + def _list_objects(self, prefix=''): + """ + Get a ContainerListing that lists objects in this container. + + prefix: (str) limit the returned keys to those starting with prefix. + """ + d = self._auth_client.get_auth_info() + def _do_list(auth_info): + request_headers = { + 'User-Agent': [self.USER_AGENT], + 'X-Auth-Token': [auth_info.auth_token], + } + url = self._make_container_url(auth_info) + if prefix: + url += "?format=json&prefix=%s" % (urllib.quote(prefix, safe=''),) + log.msg(format="OpenStack list GET %(url)s", url=url, level=log.OPERATIONAL) + return self._agent.request('GET', url, Headers(request_headers), None) + d.addCallback(_do_list) + def _got_list_response(response): + log.msg(format="OpenStack list GET response: %(code)d %(phrase)s", + code=response.code, phrase=response.phrase, level=log.OPERATIONAL) + if response.code < 200 or response.code >= 300: + raise OpenStackError("unexpected response code %r %s" % (response.code, response.phrase), + response.code, response.headers) + + collector = DataCollector() + response.deliverBody(collector) + return collector.when_done() + d.addCallback(_got_list_response) + def _parse_list(json): + items = simplejson.loads(json) + log.msg(format="OpenStack list GET read %(length)d bytes, parsed as %(items)d items", + length=len(json), items=len(items), level=log.OPERATIONAL) + + def _make_containeritem(item): + try: + key = item['name'] + size = item['bytes'] + modification_date = item['last_modified'] + etag = item['hash'] + storage_class = 'STANDARD' + except KeyError, e: + raise OpenStackError(str(e)) + else: + return ContainerItem(key, modification_date, etag, size, storage_class) + + contents = map(_make_containeritem, items) + return ContainerListing(self._container_name, prefix, None, 10000, False, contents=contents) + d.addCallback(_parse_list) + return d + + def _put_object(self, object_name, data, content_type='application/octet-stream', metadata={}): + """ + Put an object in this bucket. + Any existing object of the same name will be replaced. + """ + d = self._auth_client.get_auth_info() + def _do_put(auth_info): + content_length = len(data) + request_headers = { + 'User-Agent': [self.USER_AGENT], + 'X-Auth-Token': [auth_info.auth_token], + 'Content-Type': [content_type], + 'Content-Length': [content_length], + } + producer = FileBodyProducer(StringIO(data)) + url = self._make_object_url(auth_info, object_name) + log.msg(format="OpenStack PUT %(url)s %(content_length)d", + url=url, content_length=content_length, level=log.OPERATIONAL) + return self._agent.request('PUT', url, Headers(request_headers), producer) + d.addCallback(_do_put) + def _got_put_response(response): + log.msg(format="OpenStack PUT response: %(code)d %(phrase)s", + code=response.code, phrase=response.phrase, level=log.OPERATIONAL) + if response.code < 200 or response.code >= 300: + raise OpenStackError("unexpected response code %r %s" % (response.code, response.phrase), + response.code, response.headers) + response.deliverBody(Discard()) + d.addCallback(_got_put_response) + return d + + def _get_object(self, object_name): + """ + Get an object from this container. + """ + d = self._auth_client.get_auth_info() + def _do_get(auth_info): + request_headers = { + 'User-Agent': [self.USER_AGENT], + 'X-Auth-Token': [auth_info.auth_token], + } + url = self._make_object_url(auth_info, object_name) + log.msg(format="OpenStack GET %(url)s", url=url, level=log.OPERATIONAL) + return self._agent.request('GET', url, Headers(request_headers), None) + d.addCallback(_do_get) + def _got_get_response(response): + log.msg(format="OpenStack GET response: %(code)d %(phrase)s", + code=response.code, phrase=response.phrase, level=log.OPERATIONAL) + if response.code < 200 or response.code >= 300: + raise OpenStackError("unexpected response code %r %s" % (response.code, response.phrase), + response.code, response.headers) + + collector = DataCollector() + response.deliverBody(collector) + return collector.when_done() + d.addCallback(_got_get_response) + return d + + def _head_object(self, object_name): + """ + Retrieve object metadata only. + """ + raise NotImplementedError + + def _delete_object(self, object_name): + """ + Delete an object from this container. + Once deleted, there is no method to restore or undelete an object. + """ + d = self._auth_client.get_auth_info() + def _do_delete(auth_info): + request_headers = { + 'User-Agent': [self.USER_AGENT], + 'X-Auth-Token': [auth_info.auth_token], + } + url = self._make_object_url(auth_info, object_name) + log.msg(format="OpenStack DELETE %(url)s", url=url, level=log.OPERATIONAL) + return self._agent.request('DELETE', url, Headers(request_headers), None) + d.addCallback(_do_delete) + def _got_delete_response(response): + log.msg(format="OpenStack DELETE response: %(code)d %(phrase)s", + code=response.code, phrase=response.phrase, level=log.OPERATIONAL) + if response.code < 200 or response.code >= 300: + raise OpenStackError("unexpected response code %r %s" % (response.code, response.phrase), + response.code, response.headers) + response.deliverBody(Discard()) + d.addCallback(_got_delete_response) + return d def create(self): - return self._do_request('create bucket', self.client.create, self.container_name) + return self._do_request('create container', self._create) def delete(self): - return self._do_request('delete bucket', self.client.delete, self.container_name) + return self._do_request('delete container', self._delete) - def list_some_objects(self, **kwargs): - return self._do_request('list objects', self.client.get_bucket, self.container_name, **kwargs) + def list_objects(self, prefix=''): + return self._do_request('list objects', self._list_objects, prefix) def put_object(self, object_name, data, content_type='application/octet-stream', metadata={}): - return self._do_request('PUT object', self.client.put_object, self.container_name, - object_name, data, content_type, metadata) + return self._do_request('PUT object', self._put_object, object_name, data, content_type, metadata) def get_object(self, object_name): - return self._do_request('GET object', self.client.get_object, self.container_name, object_name) + return self._do_request('GET object', self._get_object, object_name) def head_object(self, object_name): - return self._do_request('HEAD object', self.client.head_object, self.container_name, object_name) + return self._do_request('HEAD object', self._head_object, object_name) def delete_object(self, object_name): - return self._do_request('DELETE object', self.client.delete_object, self.container_name, object_name) - - def put_policy(self, policy): - """ - Set access control policy on a bucket. - """ - query = self.client.query_factory( - action='PUT', creds=self.client.creds, endpoint=self.client.endpoint, - bucket=self.container_name, object_name='?policy', data=policy) - return self._do_request('PUT policy', query.submit) - - def get_policy(self): - query = self.client.query_factory( - action='GET', creds=self.client.creds, endpoint=self.client.endpoint, - bucket=self.container_name, object_name='?policy') - return self._do_request('GET policy', query.submit) - - def delete_policy(self): - query = self.client.query_factory( - action='DELETE', creds=self.client.creds, endpoint=self.client.endpoint, - bucket=self.container_name, object_name='?policy') - return self._do_request('DELETE policy', query.submit) + return self._do_request('DELETE object', self._delete_object, object_name)