+import urllib, simplejson
+from cStringIO import StringIO
+from collections import deque
+
from twisted.internet import defer, reactor
-from twisted.web.client import Agent
+from allmydata.util.deferredutil import eventually_callback, eventually_errback
+
+from twisted.internet.protocol import Protocol
+from twisted.web.client import Agent, FileBodyProducer, ResponseDone
from twisted.web.http_headers import Headers
from zope.interface import implements
from allmydata.util.assertutil import _assert
from allmydata.node import InvalidValueError
from allmydata.storage.backends.cloud.cloud_common import IContainer, \
- ContainerRetryMixin, ContainerListMixin
+ ContainerRetryMixin
+
+# move this
+from allmydata.storage.backends.cloud.mock_cloud import ContainerItem, ContainerListing
# Enabling this will cause secrets to be logged.
self._delayed.cancel()
-class OpenStackContainer(ContainerRetryMixin, ContainerListMixin):
+class OpenStackError(Exception):
+ pass
+
+
+class Discard(Protocol):
+ # see http://twistedmatrix.com/trac/ticket/5488
+ def makeConnection(self, producer):
+ producer.stopProducing()
+
+
+class DataCollector(Protocol):
+ def __init__(self):
+ self._data = deque()
+ self._done = defer.Deferred()
+
+ def dataReceived(self, bytes):
+ print 'Got %d bytes' % (len(bytes),)
+ self._data.append(bytes)
+
+ def connectionLost(self, reason):
+ print 'Finished receiving body: %s' % (reason.getErrorMessage(),)
+ #reason.raiseException()
+ if reason.check(ResponseDone):
+ eventually_callback(self._done)("".join(self._data))
+ else:
+ def _failed(): raise OpenStackError(reason.getErrorMessage())
+ eventually_errback(self._done)(defer.execute(_failed))
+
+ def when_done(self):
+ """CAUTION: this always returns the same Deferred."""
+ return self._done
+
+
+class OpenStackContainer(ContainerRetryMixin):
implements(IContainer)
- """
- I represent a real OpenStack container.
- """
- def __init__(self, auth_client, container_name):
+ USER_AGENT = 'Tahoe-LAFS OpenStack client'
+
+ def __init__(self, auth_client, container_name, override_reactor=None):
self._auth_client = auth_client
self._container_name = container_name
-
- #self.client = OpenStackClient(auth_client)
- #self.ServiceError = OpenStackError
+ self._reactor = override_reactor or reactor
+ self._agent = Agent(self._reactor)
+ self.ServiceError = OpenStackError
def __repr__(self):
- return ("<%s>" % (self.__class__.__name__,))
+ return ("<%s %r>" % (self.__class__.__name__, self._container_name,))
+
+ def _make_container_url(self, auth_info):
+ return "%s/%s" % (auth_info.storage_url, urllib.quote(self._container_name, safe=''))
+
+ def _make_object_url(self, auth_info, object_name):
+ return "%s/%s/%s" % (auth_info.storage_url, urllib.quote(self._container_name, safe=''), urllib.quote(object_name))
+
+ def _create(self):
+ """
+ Create this container.
+ """
+ raise NotImplementedError
+
+ def _delete(self):
+ """
+ Delete this container.
+ The cloud service may require the container to be empty before it can be deleted.
+ """
+ raise NotImplementedError
+
+ def _list_objects(self, prefix=''):
+ """
+ Get a ContainerListing that lists objects in this container.
+
+ prefix: (str) limit the returned keys to those starting with prefix.
+ """
+ d = self._auth_client.get_auth_info()
+ def _do_list(auth_info):
+ request_headers = {
+ 'User-Agent': [self.USER_AGENT],
+ 'X-Auth-Token': [auth_info.auth_token],
+ }
+ url = self._make_container_url(auth_info)
+ if prefix:
+ url += "?format=json&prefix=%s" % (urllib.quote(prefix, safe=''),)
+ log.msg(format="OpenStack list GET %(url)s", url=url, level=log.OPERATIONAL)
+ return self._agent.request('GET', url, Headers(request_headers), None)
+ d.addCallback(_do_list)
+ def _got_list_response(response):
+ log.msg(format="OpenStack list GET response: %(code)d %(phrase)s",
+ code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
+ if response.code < 200 or response.code >= 300:
+ raise OpenStackError("unexpected response code %r %s" % (response.code, response.phrase),
+ response.code, response.headers)
+
+ collector = DataCollector()
+ response.deliverBody(collector)
+ return collector.when_done()
+ d.addCallback(_got_list_response)
+ def _parse_list(json):
+ items = simplejson.loads(json)
+ log.msg(format="OpenStack list GET read %(length)d bytes, parsed as %(items)d items",
+ length=len(json), items=len(items), level=log.OPERATIONAL)
+
+ def _make_containeritem(item):
+ try:
+ key = item['name']
+ size = item['bytes']
+ modification_date = item['last_modified']
+ etag = item['hash']
+ storage_class = 'STANDARD'
+ except KeyError, e:
+ raise OpenStackError(str(e))
+ else:
+ return ContainerItem(key, modification_date, etag, size, storage_class)
+
+ contents = map(_make_containeritem, items)
+ return ContainerListing(self._container_name, prefix, None, 10000, False, contents=contents)
+ d.addCallback(_parse_list)
+ return d
+
+ def _put_object(self, object_name, data, content_type='application/octet-stream', metadata={}):
+ """
+ Put an object in this bucket.
+ Any existing object of the same name will be replaced.
+ """
+ d = self._auth_client.get_auth_info()
+ def _do_put(auth_info):
+ content_length = len(data)
+ request_headers = {
+ 'User-Agent': [self.USER_AGENT],
+ 'X-Auth-Token': [auth_info.auth_token],
+ 'Content-Type': [content_type],
+ 'Content-Length': [content_length],
+ }
+ producer = FileBodyProducer(StringIO(data))
+ url = self._make_object_url(auth_info, object_name)
+ log.msg(format="OpenStack PUT %(url)s %(content_length)d",
+ url=url, content_length=content_length, level=log.OPERATIONAL)
+ return self._agent.request('PUT', url, Headers(request_headers), producer)
+ d.addCallback(_do_put)
+ def _got_put_response(response):
+ log.msg(format="OpenStack PUT response: %(code)d %(phrase)s",
+ code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
+ if response.code < 200 or response.code >= 300:
+ raise OpenStackError("unexpected response code %r %s" % (response.code, response.phrase),
+ response.code, response.headers)
+ response.deliverBody(Discard())
+ d.addCallback(_got_put_response)
+ return d
+
+ def _get_object(self, object_name):
+ """
+ Get an object from this container.
+ """
+ d = self._auth_client.get_auth_info()
+ def _do_get(auth_info):
+ request_headers = {
+ 'User-Agent': [self.USER_AGENT],
+ 'X-Auth-Token': [auth_info.auth_token],
+ }
+ url = self._make_object_url(auth_info, object_name)
+ log.msg(format="OpenStack GET %(url)s", url=url, level=log.OPERATIONAL)
+ return self._agent.request('GET', url, Headers(request_headers), None)
+ d.addCallback(_do_get)
+ def _got_get_response(response):
+ log.msg(format="OpenStack GET response: %(code)d %(phrase)s",
+ code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
+ if response.code < 200 or response.code >= 300:
+ raise OpenStackError("unexpected response code %r %s" % (response.code, response.phrase),
+ response.code, response.headers)
+
+ collector = DataCollector()
+ response.deliverBody(collector)
+ return collector.when_done()
+ d.addCallback(_got_get_response)
+ return d
+
+ def _head_object(self, object_name):
+ """
+ Retrieve object metadata only.
+ """
+ raise NotImplementedError
+
+ def _delete_object(self, object_name):
+ """
+ Delete an object from this container.
+ Once deleted, there is no method to restore or undelete an object.
+ """
+ d = self._auth_client.get_auth_info()
+ def _do_delete(auth_info):
+ request_headers = {
+ 'User-Agent': [self.USER_AGENT],
+ 'X-Auth-Token': [auth_info.auth_token],
+ }
+ url = self._make_object_url(auth_info, object_name)
+ log.msg(format="OpenStack DELETE %(url)s", url=url, level=log.OPERATIONAL)
+ return self._agent.request('DELETE', url, Headers(request_headers), None)
+ d.addCallback(_do_delete)
+ def _got_delete_response(response):
+ log.msg(format="OpenStack DELETE response: %(code)d %(phrase)s",
+ code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
+ if response.code < 200 or response.code >= 300:
+ raise OpenStackError("unexpected response code %r %s" % (response.code, response.phrase),
+ response.code, response.headers)
+ response.deliverBody(Discard())
+ d.addCallback(_got_delete_response)
+ return d
def create(self):
- return self._do_request('create bucket', self.client.create, self.container_name)
+ return self._do_request('create container', self._create)
def delete(self):
- return self._do_request('delete bucket', self.client.delete, self.container_name)
+ return self._do_request('delete container', self._delete)
- def list_some_objects(self, **kwargs):
- return self._do_request('list objects', self.client.get_bucket, self.container_name, **kwargs)
+ def list_objects(self, prefix=''):
+ return self._do_request('list objects', self._list_objects, prefix)
def put_object(self, object_name, data, content_type='application/octet-stream', metadata={}):
- return self._do_request('PUT object', self.client.put_object, self.container_name,
- object_name, data, content_type, metadata)
+ return self._do_request('PUT object', self._put_object, object_name, data, content_type, metadata)
def get_object(self, object_name):
- return self._do_request('GET object', self.client.get_object, self.container_name, object_name)
+ return self._do_request('GET object', self._get_object, object_name)
def head_object(self, object_name):
- return self._do_request('HEAD object', self.client.head_object, self.container_name, object_name)
+ return self._do_request('HEAD object', self._head_object, object_name)
def delete_object(self, object_name):
- return self._do_request('DELETE object', self.client.delete_object, self.container_name, object_name)
-
- def put_policy(self, policy):
- """
- Set access control policy on a bucket.
- """
- query = self.client.query_factory(
- action='PUT', creds=self.client.creds, endpoint=self.client.endpoint,
- bucket=self.container_name, object_name='?policy', data=policy)
- return self._do_request('PUT policy', query.submit)
-
- def get_policy(self):
- query = self.client.query_factory(
- action='GET', creds=self.client.creds, endpoint=self.client.endpoint,
- bucket=self.container_name, object_name='?policy')
- return self._do_request('GET policy', query.submit)
-
- def delete_policy(self):
- query = self.client.query_factory(
- action='DELETE', creds=self.client.creds, endpoint=self.client.endpoint,
- bucket=self.container_name, object_name='?policy')
- return self._do_request('DELETE policy', query.submit)
+ return self._do_request('DELETE object', self._delete_object, object_name)