From: Itamar Turner-Trauring Date: Thu, 21 Mar 2013 15:34:39 +0000 (-0400) Subject: First pass at implementing the Azure GET/PUT/DELETE. X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/file/reliability?a=commitdiff_plain;h=3738a6729817f97d83c8cad49626f6c7db7233ea;p=tahoe-lafs%2Ftahoe-lafs.git First pass at implementing the Azure GET/PUT/DELETE. --- diff --git a/src/allmydata/storage/backends/cloud/googlestorage/googlestorage_container.py b/src/allmydata/storage/backends/cloud/googlestorage/googlestorage_container.py index e06e754e..cfb10e26 100644 --- a/src/allmydata/storage/backends/cloud/googlestorage/googlestorage_container.py +++ b/src/allmydata/storage/backends/cloud/googlestorage/googlestorage_container.py @@ -173,7 +173,6 @@ class GoogleStorageContainer(CommonContainerMixin): last_modified = element.find(self.NAMESPACE + "LastModified").text etag = element.find(self.NAMESPACE + "ETag").text size = int(element.find(self.NAMESPACE + "Size").text) - storage_class = element.find(self.NAMESPACE + "StorageClass") storage_class = "STANDARD" owner = None # Don't bother parsing this at the moment diff --git a/src/allmydata/storage/backends/cloud/msazure/msazure_container.py b/src/allmydata/storage/backends/cloud/msazure/msazure_container.py index 00561e20..b954c9d2 100644 --- a/src/allmydata/storage/backends/cloud/msazure/msazure_container.py +++ b/src/allmydata/storage/backends/cloud/msazure/msazure_container.py @@ -8,9 +8,18 @@ import urlparse import base64 import hmac import hashlib +import urllib +try: + from xml.etree import cElementTree as ElementTree +except ImportError: + from xml.etree import ElementTree +import time from zope.interface import implements +from twisted.web.http_headers import Headers +from twisted.web.http import datetimeToString + from allmydata.storage.backends.cloud.cloud_common import IContainer, \ ContainerItem, ContainerListing, CommonContainerMixin @@ -23,11 +32,14 @@ class MSAzureStorageContainer(CommonContainerMixin): USER_AGENT = "Tahoe-LAFS Microsoft Azure client" + _time = time.time + def __init__(self, account_name, account_key, container_name, override_reactor=None): CommonContainerMixin.__init__(self, container_name, override_reactor) self._account_name = account_name self._account_key = base64.b64decode(account_key) + self.URI = "https://%s.blob.core.windows.net" % (account_name, ) def _calculate_presignature(self, method, url, headers): """ @@ -38,6 +50,7 @@ class MSAzureStorageContainer(CommonContainerMixin): The HMAC, and formatting into HTTP header, is not done in this layer. """ + headers = Headers(headers) parsed_url = urlparse.urlparse(url) result = method + "\n" # Add standard headers: @@ -95,3 +108,107 @@ class MSAzureStorageContainer(CommonContainerMixin): """ Do an HTTP request with the addition of a authorization header. """ + request_headers["x-ms-date"] = [datetimeToString(self._time())] + request_headers["x-ms-version"] = ["2012-02-12"] + request_headers["Authorization"] = [ + self._calculate_signature(method, url, request_headers)] + return self._http_request(what, method, url, request_headers, body=body, + need_response_body=need_response_body) + + def _parse_item(self, element): + """ + Parse a XML element into a ContainerItem. + """ + key = element.find("Name").text + element = element.find("Properties") + last_modified = element.find("Last-Modified").text + etag = element.find("Etag").text + size = int(element.find("Content-Length").text) + storage_class = "STANDARD" # not sure what it means in this context + owner = None # Don't bother parsing this at the moment + + return ContainerItem(key, last_modified, etag, size, storage_class, + owner) + + def _parse_list(self, data, prefix): + """ + Parse the XML response, converting it into a ContainerListing. + """ + name = self._container_name + marker = None + max_keys = None + is_truncated = "false" + common_prefixes = [] + contents = [] + + root = ElementTree.fromstring(data) + if root.tag != "EnumerationResults": + raise ValueError("Unknown root XML element %s" % (root.tag,)) + for element in root: + tag = element.tag + if tag == "NextMarker": + marker = element.text + elif tag == "Blobs": + for subelement in element: + if subelement.tag == "Blob": + contents.append(self._parse_item(subelement)) + + return ContainerListing(name, prefix, marker, max_keys, is_truncated, + contents, common_prefixes) + + def _list_objects(self, prefix): + """ + List objects in this container with the given prefix. + """ + url = self._make_container_url(self.URI) + url += "?comp=list&restype=container&prefix=" + urllib.quote(prefix, safe='') + d = self._authorized_http_request("MS Azure list objects", 'GET', + url, {}, + body=None, + need_response_body=True) + d.addCallback(lambda (response, body): self._parse_list(body, prefix)) + return d + + def _put_object(self, object_name, data, content_type, metadata): + """ + Put an object into this container. + """ + url = self._make_object_url(self.URI, object_name) + # In theory Agent will add the content length for us, but we need it + # at this layer in order for the HMAC authorization to be calculated + # correctly: + request_headers = {'Content-Length': ["%d" % (len(data),)], + 'Content-Type': [content_type], + } + for key, value in metadata.items(): + request_headers["x-ms-meta-%s" % (key,)] = [value] + + d = self._authorized_http_request("MS Azure PUT object", 'PUT', url, + request_headers, + body=data, need_response_body=False) + d.addCallback(lambda (response, body): body) + return d + + def _get_object(self, object_name): + """ + Get an object from this container. + """ + url = self._make_object_url(self.URI, object_name) + d = self._authorized_http_request("MS Azure GET object", 'GET', + url, {}, + body=None, + need_response_body=True) + d.addCallback(lambda (response, body): body) + return d + + def _delete_object(self, object_name): + """ + Delete an object from this container. + """ + url = self._make_object_url(self.URI, object_name) + d = self._authorized_http_request("MS Azure DELETE object", 'DELETE', + url, {}, + body=None, + need_response_body=False) + d.addCallback(lambda (response, body): body) + return d diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 115c0d19..4a5312bc 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -762,7 +762,27 @@ class GoogleStorageAuthenticationClient(unittest.TestCase): return auth.get_authorization_header().addCallback(gotResult) -class GoogleStorageBackend(unittest.TestCase): +class CloudStorageBackendMixin(object): + """ + Utility functionality for testing cloud storage backends. + """ + class Response(object): + def __init__(self, code, headers={}): + self.code = code + self.headers = headers + + def mock_http_request(self): + """ + Override the container's _http_request with a mock whose result is a + Deferred which can be fired by the caller. + """ + d = defer.Deferred() + self.container._http_request = mock.create_autospec( + self.container._http_request, return_value=d) + return d + + +class GoogleStorageBackend(unittest.TestCase, CloudStorageBackendMixin): """ Tests for the Google Storage API container. @@ -773,11 +793,6 @@ class GoogleStorageBackend(unittest.TestCase): if not googlestorage_container.oauth2client_available: skip = "Google Storage requires oauth2client" - class Response(object): - def __init__(self, code, headers={}): - self.code = code - self.headers = headers - def setUp(self): self.reactor = Clock() class FakeAuthenticationClient(object): @@ -787,16 +802,6 @@ class GoogleStorageBackend(unittest.TestCase): self.container = googlestorage_container.GoogleStorageContainer( self.auth, "123", "thebucket", self.reactor) - def mock_http_request(self): - """ - Override the container's _http_request with a mock whose result is a - Deferred which can be fired by the caller. - """ - d = defer.Deferred() - self.container._http_request = mock.create_autospec( - self.container._http_request, return_value=d) - return d - def test_create(self): """ GoogleStorageContainer.create() sends the appropriate HTTP command to @@ -1052,7 +1057,7 @@ class MSAzureAuthentication(unittest.TestCase): def __init__(self, method, url, headers): from urlparse import urlparse, parse_qs self.headers = [ - (key.lower(), value[0]) for key, value in headers.getAllRawHeaders()] + (key.lower(), value[0]) for key, value in headers.items()] url = urlparse(url) self.path = url.path self.query = url.query @@ -1071,7 +1076,6 @@ class MSAzureAuthentication(unittest.TestCase): If possible, assert the signature calculation matches the Microsoft reference implementation. """ - headers = Headers(headers) self.assertEqual( self.container._calculate_presignature(method, url, headers), result) @@ -1194,6 +1198,239 @@ class MSAzureAuthentication(unittest.TestCase): "z:hello there") +class MSAzureStorageBackendTests(unittest.TestCase, CloudStorageBackendMixin): + """ + Tests for the Microsoft Azure Blob API container. + + All code references in docstrings/comments are to classes/functions in + allmydata.storage.backends.cloud.msazure.msazure_container + unless noted otherwise. + """ + def setUp(self): + self.reactor = Clock() + self.container = msazure_container.MSAzureStorageContainer( + "theaccount", "thekey".encode("base64"), "thebucket", self.reactor) + # Simplify the expected Authorization header: + self.container._calculate_signature = lambda *args: "signature" + self.authorization = "signature" + # Hardcode the time of date header: + self.container._time = lambda: 123 + self.date = "Thu, 01 Jan 1970 00:02:03 GMT" + + def test_list_objects(self): + """ + MSAzureStorageContainer.list_objects() sends the appropriate HTTP + command to list the objects in the bucket, and parses the response to + match the expected result documented in the IContainer interface. + """ + LIST_RESPONSE = """\ + + + 4 + + + xxx xxx/firstblob + http://theaccount.blob.core.windows.net/thebucket/xxx%20xxx/firstblob + + Mon, 30 Jan 2013 01:23:45 GMT + abc + 123 + text/plain + + en-US + + no-cache + BlockBlob + unlocked + + + + xxx xxx/secondblob + http://myaccount.blob.core.windows.net/mycontainer/xxx%20xxx/secondblob + + Mon, 30 Jan 2013 01:23:46 GMT + def + 100 + text/html + + + + no-cache + BlockBlob + unlocked + + + + + + xxx xxx/foo +""" + http_response = self.mock_http_request() + done = [] + self.container.list_objects(prefix='xxx xxx/').addCallback(done.append) + self.assertFalse(done) + self.container._http_request.assert_called_once_with( + "MS Azure list objects", "GET", + "https://theaccount.blob.core.windows.net/thebucket?comp=list&restype=container&prefix=xxx%20xxx%2F", + {"Authorization": [self.authorization], + "x-ms-version": ["2012-02-12"], + "x-ms-date": [self.date], + }, + body=None, + need_response_body=True) + http_response.callback((self.Response(200), LIST_RESPONSE)) + listing = done[0] + self.assertEqual(listing.name, "thebucket") + self.assertEqual(listing.prefix, "xxx xxx/") + self.assertEqual(listing.marker, "xxx xxx/foo") + self.assertEqual(listing.max_keys, None) + self.assertEqual(listing.is_truncated, "false") + item1, item2 = listing.contents + self.assertEqual(item1.key, "xxx xxx/firstblob") + self.assertEqual(item1.modification_date, "Mon, 30 Jan 2013 01:23:45 GMT") + self.assertEqual(item1.etag, 'abc') + self.assertEqual(item1.size, 123) + self.assertEqual(item1.owner, None) # meh, who cares + self.assertEqual(item2.key, "xxx xxx/secondblob") + self.assertEqual(item2.modification_date, "Mon, 30 Jan 2013 01:23:46 GMT") + self.assertEqual(item2.etag, 'def') + self.assertEqual(item2.size, 100) + self.assertEqual(item2.owner, None) # meh, who cares + + def test_put_object(self): + """ + MSAzureStorageContainer.put_object() sends the appropriate HTTP command + to upload an object to the bucket, and parses the response to match + the expected result documented in the IContainer interface. + """ + http_response = self.mock_http_request() + done = [] + self.container.put_object("theobj", "the body").addCallback(done.append) + self.assertFalse(done) + self.container._http_request.assert_called_once_with( + "MS Azure PUT object", "PUT", + "https://theaccount.blob.core.windows.net/thebucket/theobj", + {"Authorization": [self.authorization], + "x-ms-version": ["2012-02-12"], + "Content-Type": ["application/octet-stream"], + "Content-Length": [str(len("the body"))], + "x-ms-date": [self.date], + }, + body="the body", + need_response_body=False) + http_response.callback((self.Response(200), None)) + self.assertTrue(done) + + def test_put_object_additional(self): + """ + MSAzureStorageContainer.put_object() sends the appropriate HTTP command + to upload an object to the bucket with custom content type and + metadata, and parses the response to match the expected result + documented in the IContainer interface. + """ + http_response = self.mock_http_request() + done = [] + self.container.put_object("theobj", "the body", + "text/plain", + {"key": "value"}).addCallback(done.append) + self.assertFalse(done) + self.assertFalse(done) + self.container._http_request.assert_called_once_with( + "MS Azure PUT object", "PUT", + "https://theaccount.blob.core.windows.net/thebucket/theobj", + {"Authorization": [self.authorization], + "x-ms-version": ["2012-02-12"], + "Content-Type": ["text/plain"], + "Content-Length": [str(len("the body"))], + "x-ms-meta-key": ["value"], + "x-ms-date": [self.date], + }, + body="the body", + need_response_body=False) + http_response.callback((self.Response(200), None)) + self.assertTrue(done) + + def test_get_object(self): + """ + MSAzureStorageContainer.get_object() sends the appropriate HTTP command + to get an object from the bucket, and parses the response to match the + expected result documented in the IContainer interface. + """ + http_response = self.mock_http_request() + done = [] + self.container.get_object("theobj").addCallback(done.append) + self.assertFalse(done) + self.container._http_request.assert_called_once_with( + "MS Azure GET object", "GET", + "https://theaccount.blob.core.windows.net/thebucket/theobj", + {"Authorization": [self.authorization], + "x-ms-version": ["2012-02-12"], + "x-ms-date": [self.date], + }, + body=None, + need_response_body=True) + http_response.callback((self.Response(200), "the body")) + self.assertEqual(done, ["the body"]) + + def test_delete_object(self): + """ + MSAzureStorageContainer.delete_object() sends the appropriate HTTP + command to delete an object from the bucket, and parses the response + to match the expected result documented in the IContainer interface. + """ + http_response = self.mock_http_request() + done = [] + self.container.delete_object("theobj").addCallback(done.append) + self.assertFalse(done) + self.container._http_request.assert_called_once_with( + "MS Azure DELETE object", "DELETE", + "https://theaccount.blob.core.windows.net/thebucket/theobj", + {"Authorization": [self.authorization], + "x-ms-version": ["2012-02-12"], + "x-ms-date": [self.date], + }, + body=None, + need_response_body=False) + http_response.callback((self.Response(200), None)) + self.assertTrue(done) + + def test_retry(self): + """ + If an HTTP response code is server error or an authentication error, + the request will try again after a delay. + """ + # XXX should maybe be refactored into test for the common base class + # that implments retries... + first, second = defer.Deferred(), defer.Deferred() + self.container._http_request = mock.create_autospec( + self.container._http_request, side_effect=[first, second]) + result = [] + self.container._do_request("test", self.container._http_request, + "test", "GET", "http://example", {}, body=None, + need_response_body=True).addCallback(result.append) + # No response from first request yet: + self.assertFalse(result) + self.assertEqual(self.container._http_request.call_count, 1) + self.container._http_request.assert_called_with( + "test", "GET", "http://example", {}, + body=None, need_response_body=True) + + # First response fails: + first.errback(CloudServiceError(None, 500)) + self.assertFalse(result, result) + self.assertEqual(self.container._http_request.call_count, 1) + self.reactor.advance(0.1) + self.assertEqual(self.container._http_request.call_count, 2) + self.container._http_request.assert_called_with( + "test", "GET", "http://example", {}, + body=None, need_response_body=True) + + # Second response succeeds: + done = object() + second.callback(done) + self.assertEqual(result, [done]) + + class ServerMixin: def allocate(self, account, storage_index, sharenums, size, canary=None): # These secrets are not used, but clients still provide them.