From: Itamar Turner-Trauring Date: Tue, 5 Mar 2013 15:47:37 +0000 (-0500) Subject: googlestorage_container.py: Implement PUT and listing of bucket contents. X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/uri/reliability?a=commitdiff_plain;h=758decd48bfe440849384ee4ecffafec75550af8;p=tahoe-lafs%2Ftahoe-lafs.git googlestorage_container.py: Implement PUT and listing of bucket contents. --- diff --git a/src/allmydata/storage/backends/cloud/cloud_common.py b/src/allmydata/storage/backends/cloud/cloud_common.py index 98c552ba..6cacbaf6 100644 --- a/src/allmydata/storage/backends/cloud/cloud_common.py +++ b/src/allmydata/storage/backends/cloud/cloud_common.py @@ -401,7 +401,7 @@ class ContainerRetryMixin: if len(fargs) > 0: retry = self._react_to_error(int(fargs[0])) if retry: - d = task.deferLater(reactor, BACKOFF_SECONDS_BEFORE_RETRY[trynum-1], operation, *args, **kwargs) + d = task.deferLater(self._reactor, BACKOFF_SECONDS_BEFORE_RETRY[trynum-1], operation, *args, **kwargs) d.addErrback(self._handle_error, trynum+1, first_err_and_tb, description, operation, *args, **kwargs) return d diff --git a/src/allmydata/storage/backends/cloud/googlestorage/googlestorage_container.py b/src/allmydata/storage/backends/cloud/googlestorage/googlestorage_container.py index d272bd3c..75264e23 100644 --- a/src/allmydata/storage/backends/cloud/googlestorage/googlestorage_container.py +++ b/src/allmydata/storage/backends/cloud/googlestorage/googlestorage_container.py @@ -4,19 +4,26 @@ This requires the oauth2client library: http://code.google.com/p/google-api-python-client/downloads/list """ +import urllib +try: + from xml.etree import cElementTree as ElementTree +except ImportError: + from xml.etree import ElementTree + # Maybe we can make a thing that looks like httplib2.Http but actually uses # Twisted? import httplib2 from twisted.internet.defer import DeferredLock from twisted.internet.threads import deferToThread +from twisted.web.http import UNAUTHORIZED from oauth2client.client import SignedJwtAssertionCredentials from zope.interface import implements from allmydata.storage.backends.cloud.cloud_common import IContainer, \ - CloudServiceError, ContainerItem, ContainerListing, CommonContainerMixin + ContainerItem, ContainerListing, CommonContainerMixin def configure_googlestorage_container(*args): @@ -85,12 +92,19 @@ class GoogleStorageContainer(CommonContainerMixin): USER_AGENT = "Tahoe-LAFS Google Storage client" URI = "https://storage.googleapis.com" + NAMESPACE = "{http://doc.storage.googleapis.com/2010-04-03}" def __init__(self, auth_client, project_id, bucket_name, override_reactor=None): CommonContainerMixin.__init__(self, bucket_name, override_reactor) self._auth_client = auth_client self._project_id = project_id # Only need for bucket creation/deletion + def _react_to_error(self, response_code): + if response_code == UNAUTHORIZED: + return True + else: + return CommonContainerMixin._react_to_error(self, response_code) + def _get_object(self, object_name): """ Get an object from this container. @@ -102,7 +116,7 @@ class GoogleStorageContainer(CommonContainerMixin): "x-goog-api-version": ["2"], } url = self._make_object_url(self.URI, object_name) - return self._http_request("GET object", 'GET', url, request_headers, + return self._http_request("Google Storage GET object", 'GET', url, request_headers, body=None, need_response_body=True) d.addCallback(_do_get) @@ -120,13 +134,98 @@ class GoogleStorageContainer(CommonContainerMixin): "x-goog-api-version": ["2"], } url = self._make_object_url(self.URI, object_name) - return self._http_request("DELETE object", 'DELETE', url, request_headers, + return self._http_request("Google Storage DELETE object", 'DELETE', url, request_headers, body=None, need_response_body=False) d.addCallback(_do_delete) d.addCallback(lambda (response, body): body) return d + def _put_object(self, object_name, data, content_type, metadata): + """ + Put an object into this container. + """ + d = self._auth_client.get_authorization_header() + def _do_put(auth_header): + request_headers = { + 'Authorization': [auth_header], + "x-goog-api-version": ["2"], + "Content-Type": [content_type], + } + for key, value in metadata.items(): + request_headers["x-goog-meta-" + key] = [value] + url = self._make_object_url(self.URI, object_name) + return self._http_request("Google Storage PUT object", 'PUT', url, request_headers, + body=data, + need_response_body=False) + d.addCallback(_do_put) + d.addCallback(lambda (response, body): body) + return d + + def _parse_item(self, element): + """ + Parse a XML element into a ContainerItem. + """ + key = element.find(self.NAMESPACE + "Key").text + last_modified = element.find(self.NAMESPACE + "LastModified").text + etag = element.find(self.NAMESPACE + "ETag").text + size = int(element.find(self.NAMESPACE + "Size").text) + storage_class = element.find(self.NAMESPACE + "StorageClass").text + owner = None # Don't bother parsing this at the moment + + return ContainerItem(key, last_modified, etag, size, storage_class, + owner) + + def _parse_list(self, data, prefix): + """ + Parse the XML response, converting it into a ContainerListing. + """ + name = self._container_name + marker = None + max_keys = None + is_truncated = "false" + common_prefixes = [] + contents = [] + + # Sigh. + ns_len = len(self.NAMESPACE) + + root = ElementTree.fromstring(data) + if root.tag != self.NAMESPACE + "ListBucketResult": + raise ValueError("Unknown root XML element %s" % (root.tag,)) + for element in root: + tag = element.tag[ns_len:] + if tag == "Marker": + marker = element.text + elif tag == "IsTruncated": + is_truncated = element.text + elif tag == "Contents": + contents.append(self._parse_item(element)) + elif tag == "CommonPrefixes": + common_prefixes.append(element.find(self.NAMESPACE + "Prefix").text) + + return ContainerListing(name, prefix, marker, max_keys, is_truncated, + contents, common_prefixes) + + def _list_objects(self, prefix): + """ + List objects in this container with the given prefix. + """ + d = self._auth_client.get_authorization_header() + def _do_list(auth_header): + request_headers = { + 'Authorization': [auth_header], + "x-goog-api-version": ["2"], + } + url = self._make_container_url(self.URI) + url += "?prefix=" + urllib.quote(prefix, safe='') + return self._http_request("Google Storage list objects", 'GET', url, request_headers, + body=None, + need_response_body=True) + d.addCallback(_do_list) + d.addCallback(lambda (response, body): self._parse_list(body, prefix)) + return d + if __name__ == '__main__': from twisted.internet import reactor diff --git a/src/allmydata/storage/backends/cloud/mock_cloud.py b/src/allmydata/storage/backends/cloud/mock_cloud.py index 3cda2ffc..744441f9 100644 --- a/src/allmydata/storage/backends/cloud/mock_cloud.py +++ b/src/allmydata/storage/backends/cloud/mock_cloud.py @@ -1,7 +1,8 @@ import os.path -from twisted.internet import defer +from twisted.internet import defer, reactor + from allmydata.util.deferredutil import async_iterate from zope.interface import implements @@ -36,7 +37,7 @@ class MockContainer(ContainerRetryMixin, ContainerListMixin): self.ServiceError = CloudServiceError self._load_count = 0 self._store_count = 0 - + self._reactor = reactor fileutil.make_dirs(os.path.join(self._storagedir, "shares")) def __repr__(self): diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index b690ea04..ff134e02 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -804,7 +804,7 @@ class GoogleStorageBackend(unittest.TestCase): result documented in the IContainer interface. """ raise NotImplementedError() - test_create.todo = "may not be necessary" + test_create.skip = "may not be necessary" def test_delete(self): """ @@ -813,7 +813,7 @@ class GoogleStorageBackend(unittest.TestCase): result documented in the IContainer interface. """ raise NotImplementedError() - test_delete.todo = "may not be necessary" + test_delete.skip = "may not be necessary" def test_list_objects(self): """ @@ -843,7 +843,7 @@ class GoogleStorageBackend(unittest.TestCase): xxx xxx2 - 1234 + 1234 1 2013-01-28T01:23:45.678Z "def" @@ -869,8 +869,8 @@ class GoogleStorageBackend(unittest.TestCase): self.container.list_objects(prefix='xxx xxx').addCallback(done.append) self.assertFalse(done) self.container._http_request.assert_called_once_with( - "list objects", "GET", - "https://storage.googleapis.com/thebucket/?prefix=xxx%20xxx", + "Google Storage list objects", "GET", + "https://storage.googleapis.com/thebucket?prefix=xxx%20xxx", {"Authorization": ["Bearer thetoken"], "x-goog-api-version": ["2"], }, @@ -888,13 +888,13 @@ class GoogleStorageBackend(unittest.TestCase): self.assertEqual(item1.key, "xxx xxx1") self.assertEqual(item1.modification_date, "2013-01-27T01:23:45.678Z") self.assertEqual(item1.etag, '"abc"') - self.assertEqual(item1.size, '123') + self.assertEqual(item1.size, 123) self.assertEqual(item1.storage_class, 'STANDARD') self.assertEqual(item1.owner, None) # meh, who cares self.assertEqual(item2.key, "xxx xxx2") self.assertEqual(item2.modification_date, "2013-01-28T01:23:45.678Z") self.assertEqual(item2.etag, '"def"') - self.assertEqual(item2.size, '456') + self.assertEqual(item2.size, 456) self.assertEqual(item2.storage_class, 'NOTSTANDARD') self.assertEqual(item2.owner, None) # meh, who cares @@ -909,7 +909,7 @@ class GoogleStorageBackend(unittest.TestCase): self.container.put_object("theobj", "the body").addCallback(done.append) self.assertFalse(done) self.container._http_request.assert_called_once_with( - "PUT object", "PUT", + "Google Storage PUT object", "PUT", "https://storage.googleapis.com/thebucket/theobj", {"Authorization": ["Bearer thetoken"], "x-goog-api-version": ["2"], @@ -934,7 +934,7 @@ class GoogleStorageBackend(unittest.TestCase): {"key": "value"}).addCallback(done.append) self.assertFalse(done) self.container._http_request.assert_called_once_with( - "PUT object", "PUT", + "Google Storage PUT object", "PUT", "https://storage.googleapis.com/thebucket/theobj", {"Authorization": ["Bearer thetoken"], "x-goog-api-version": ["2"], @@ -957,7 +957,7 @@ class GoogleStorageBackend(unittest.TestCase): self.container.get_object("theobj").addCallback(done.append) self.assertFalse(done) self.container._http_request.assert_called_once_with( - "GET object", "GET", + "Google Storage GET object", "GET", "https://storage.googleapis.com/thebucket/theobj", {"Authorization": ["Bearer thetoken"], "x-goog-api-version": ["2"], @@ -978,7 +978,7 @@ class GoogleStorageBackend(unittest.TestCase): self.container.delete_object("theobj").addCallback(done.append) self.assertFalse(done) self.container._http_request.assert_called_once_with( - "DELETE object", "DELETE", + "Google Storage DELETE object", "DELETE", "https://storage.googleapis.com/thebucket/theobj", {"Authorization": ["Bearer thetoken"], "x-goog-api-version": ["2"], @@ -993,13 +993,13 @@ class GoogleStorageBackend(unittest.TestCase): If an HTTP response code is server error or an authentication error, the request will try again after a delay. """ - first, second, third = defer.Deferred(), defer.Deferred() + first, second, third = defer.Deferred(), defer.Deferred(), defer.Deferred() self.container._http_request = mock.create_autospec( - self._container._http_request, side_effect=[first, second, third]) + self.container._http_request, side_effect=[first, second, third]) result = [] - self.container_do_request( - "test", "GET", "http://example", {}, - body=None, need_response_body=True).addCallback(result.append) + self.container._do_request("test", self.container._http_request, + "test", "GET", "http://example", {}, body=None, + need_response_body=True).addCallback(result.append) # No response from first request yet: self.assertFalse(result) self.assertEqual(self.container._http_request.call_count, 1) @@ -1008,28 +1008,29 @@ class GoogleStorageBackend(unittest.TestCase): body=None, need_response_body=True) # First response fails: - first.callback((self.Response(500), None)) - self.assertFalse(result) + first.errback(CloudServiceError(None, 500)) + self.assertFalse(result, result) self.assertEqual(self.container._http_request.call_count, 1) - self.reactor.advance(2) + self.reactor.advance(0.1) self.assertEqual(self.container._http_request.call_count, 2) self.container._http_request.assert_called_with( "test", "GET", "http://example", {}, body=None, need_response_body=True) # Second response fails: - second.callback((self.Response(401), None)) # Unauthorized + second.errback(CloudServiceError(None, 401)) # Unauthorized self.assertFalse(result) self.assertEqual(self.container._http_request.call_count, 2) - self.reactor.advance(10) + self.reactor.advance(2) self.assertEqual(self.container._http_request.call_count, 3) self.container._http_request.assert_called_with( "test", "GET", "http://example", {}, body=None, need_response_body=True) # Third response succeeds: - third.callback((self.Response(200), "the result")) - self.assertEqual(result, ["the result"]) + done = object() + third.callback(done) + self.assertEqual(result, [done]) def test_head_object(self): """ @@ -1039,7 +1040,7 @@ class GoogleStorageBackend(unittest.TestCase): interface. """ raise NotImplementedError() - test_head_object.todo = "May not be necessary" + test_head_object.skip = "May not be necessary" class ServerMixin: