From 293a33fa73d802fdac946f60db151308c5b58b74 Mon Sep 17 00:00:00 2001 From: Daira Hopwood Date: Thu, 27 Mar 2014 17:45:19 +0000 Subject: [PATCH] Fix #2206. Includes refactoring of ContainerRetryMixin into CommonContainerMixin, and rearrangement of the code to initialize HTTPClientMixin. Signed-off-by: Daira Hopwood --- .../storage/backends/cloud/cloud_common.py | 105 ++++++++---------- .../googlestorage/googlestorage_container.py | 5 +- .../storage/backends/cloud/mock_cloud.py | 30 +---- .../cloud/msazure/msazure_container.py | 5 +- .../cloud/openstack/openstack_container.py | 4 +- .../storage/backends/cloud/s3/s3_container.py | 55 ++++----- src/allmydata/test/test_storage.py | 11 +- 7 files changed, 88 insertions(+), 127 deletions(-) diff --git a/src/allmydata/storage/backends/cloud/cloud_common.py b/src/allmydata/storage/backends/cloud/cloud_common.py index 93c3fc50..c3d06068 100644 --- a/src/allmydata/storage/backends/cloud/cloud_common.py +++ b/src/allmydata/storage/backends/cloud/cloud_common.py @@ -333,8 +333,10 @@ class ContainerListing(object): BACKOFF_SECONDS_BEFORE_RETRY = (0, 2, 10) -class ContainerRetryMixin: +class CommonContainerMixin: """ + Base class for cloud storage providers with similar APIs. + I provide a helper method for performing an operation on a cloud container that will retry up to len(BACKOFF_SECONDS_FOR_RETRY) times (not including the initial try). If the initial try fails, a single incident will be triggered after the operation has succeeded or failed. @@ -350,6 +352,21 @@ class ContainerRetryMixin: Returns True if the error should be retried. May perform side effects before the retry. """ + def __init__(self, container_name, override_reactor=None): + self._container_name = container_name + self._reactor = override_reactor or reactor + self.ServiceError = CloudServiceError + + def __repr__(self): + return ("<%s %r>" % (self.__class__.__name__, self._container_name,)) + + def _make_container_url(self, public_storage_url): + return "%s/%s" % (public_storage_url, urllib.quote(self._container_name, safe='')) + + def _make_object_url(self, public_storage_url, object_name): + return "%s/%s/%s" % (public_storage_url, urllib.quote(self._container_name, safe=''), + urllib.quote(object_name)) + def _react_to_error(self, response_code): """ The default policy is to retry on 5xx errors. @@ -357,11 +374,7 @@ class ContainerRetryMixin: return response_code >= 500 and response_code < 600 def _strip_data(self, args): - """ - By default retain only one argument (object_name) for logging. - Subclasses should override this if more than one argument is safe to log - (we want to avoid logging data). - """ + # Retain only one argument, object_name, for logging (we want to avoid logging data). return args[:1] def _do_request(self, description, operation, *args, **kwargs): @@ -423,6 +436,27 @@ class ContainerRetryMixin: log.msg("Giving up, no retry for %s" % (err,)) raise err.__class__, err, tb + def create(self): + return self._do_request('create container', self._create) + + def delete(self): + return self._do_request('delete container', self._delete) + + def list_objects(self, prefix=''): + return self._do_request('list objects', self._list_objects, prefix) + + def put_object(self, object_name, data, content_type='application/octet-stream', metadata={}): + return self._do_request('PUT object', self._put_object, object_name, data, content_type, metadata) + + def get_object(self, object_name): + return self._do_request('GET object', self._get_object, object_name) + + def head_object(self, object_name): + return self._do_request('HEAD object', self._head_object, object_name) + + def delete_object(self, object_name): + return self._do_request('DELETE object', self._delete_object, object_name) + def concat(seqs): """ @@ -676,6 +710,12 @@ class HTTPClientMixin: ServiceError: The error class to trap (CloudServiceError or similar). """ + + def _init_agent(self): + pool = HTTPConnectionPool(self._reactor) + pool.maxPersistentPerHost = 20 + self._agent = Agent(self._reactor, connectTimeout=10, pool=pool) + def _http_request(self, what, method, url, request_headers, body=None, need_response_body=False): # Agent.request adds a Host header automatically based on the URL. request_headers['User-Agent'] = [self.USER_AGENT] @@ -719,56 +759,3 @@ class HTTPClientMixin: raise self.ServiceError(None, response.code, message="missing response header %r" % (name,)) return hs[0] - - - -class CommonContainerMixin(HTTPClientMixin, ContainerRetryMixin): - """ - Base class for cloud storage providers with similar APIs. - - In particular, OpenStack and Google Storage are very similar (presumably - since they both copy S3). - """ - - def __init__(self, container_name, override_reactor=None): - self._container_name = container_name - - # Maybe this should be in HTTPClientMixin? - self._reactor = override_reactor or reactor - pool = HTTPConnectionPool(self._reactor) - pool.maxPersistentPerHost = 20 - self._agent = Agent(self._reactor, connectTimeout=10, pool=pool) - self.ServiceError = CloudServiceError - - def __repr__(self): - return ("<%s %r>" % (self.__class__.__name__, self._container_name,)) - - def _make_container_url(self, public_storage_url): - return "%s/%s" % (public_storage_url, urllib.quote(self._container_name, safe='')) - - def _make_object_url(self, public_storage_url, object_name): - return "%s/%s/%s" % (public_storage_url, urllib.quote(self._container_name, safe=''), - urllib.quote(object_name)) - - # Consider moving these to ContainerRetryMixin. - - def create(self): - return self._do_request('create container', self._create) - - def delete(self): - return self._do_request('delete container', self._delete) - - def list_objects(self, prefix=''): - return self._do_request('list objects', self._list_objects, prefix) - - def put_object(self, object_name, data, content_type='application/octet-stream', metadata={}): - return self._do_request('PUT object', self._put_object, object_name, data, content_type, metadata) - - def get_object(self, object_name): - return self._do_request('GET object', self._get_object, object_name) - - def head_object(self, object_name): - return self._do_request('HEAD object', self._head_object, object_name) - - def delete_object(self, object_name): - return self._do_request('DELETE object', self._delete_object, object_name) diff --git a/src/allmydata/storage/backends/cloud/googlestorage/googlestorage_container.py b/src/allmydata/storage/backends/cloud/googlestorage/googlestorage_container.py index 14d28d66..6294f4a3 100644 --- a/src/allmydata/storage/backends/cloud/googlestorage/googlestorage_container.py +++ b/src/allmydata/storage/backends/cloud/googlestorage/googlestorage_container.py @@ -30,7 +30,7 @@ from zope.interface import implements from allmydata.util import log from allmydata.storage.backends.cloud.cloud_common import IContainer, \ - ContainerItem, ContainerListing, CommonContainerMixin + ContainerItem, ContainerListing, CommonContainerMixin, HTTPClientMixin class AuthenticationClient(object): @@ -99,7 +99,7 @@ class AuthenticationClient(object): return d -class GoogleStorageContainer(CommonContainerMixin): +class GoogleStorageContainer(CommonContainerMixin, HTTPClientMixin): implements(IContainer) USER_AGENT = "Tahoe-LAFS Google Storage client" @@ -110,6 +110,7 @@ class GoogleStorageContainer(CommonContainerMixin): def __init__(self, auth_client, project_id, bucket_name, override_reactor=None): CommonContainerMixin.__init__(self, bucket_name, override_reactor) + self._init_agent() self._auth_client = auth_client self._project_id = project_id # Only need for bucket creation/deletion diff --git a/src/allmydata/storage/backends/cloud/mock_cloud.py b/src/allmydata/storage/backends/cloud/mock_cloud.py index c2b834d7..6894e6cc 100644 --- a/src/allmydata/storage/backends/cloud/mock_cloud.py +++ b/src/allmydata/storage/backends/cloud/mock_cloud.py @@ -10,7 +10,7 @@ from zope.interface import implements from allmydata.util.assertutil import _assert from allmydata.storage.backends.cloud.cloud_common import IContainer, \ CloudServiceError, ContainerItem, ContainerListing, \ - ContainerRetryMixin, ContainerListMixin + CommonContainerMixin, ContainerListMixin from allmydata.util.time_format import iso_utc from allmydata.util import fileutil @@ -32,7 +32,7 @@ def hook_create_container(): return defer.execute(_not_implemented) -class MockContainer(ContainerRetryMixin, ContainerListMixin): +class MockContainer(ContainerListMixin, CommonContainerMixin): implements(IContainer) """ I represent a mock cloud container that stores its data in the local filesystem. @@ -69,6 +69,9 @@ class MockContainer(ContainerRetryMixin, ContainerListMixin): sharefile = os.path.join(sidir, shnumstr) yield (sharefile, "%s/%s" % (sikey, shnumstr)) + def list_some_objects(self, **kwargs): + return self._do_request('list objects', self._list_some_objects, **kwargs) + def _list_some_objects(self, prefix='', marker=None, max_keys=None): if max_keys is None: max_keys = MAX_KEYS @@ -122,29 +125,6 @@ class MockContainer(ContainerRetryMixin, ContainerListMixin): fileutil.remove(self._get_path(object_name, must_exist=True)) return defer.succeed(None) - # methods that use error handling from ContainerRetryMixin - - def create(self): - return self._do_request('create bucket', self._create) - - def delete(self): - return self._do_request('delete bucket', self._delete) - - def list_some_objects(self, **kwargs): - return self._do_request('list objects', self._list_some_objects, **kwargs) - - def put_object(self, object_name, data, content_type='application/octet-stream', metadata={}): - return self._do_request('PUT object', self._put_object, object_name, data, content_type, metadata) - - def get_object(self, object_name): - return self._do_request('GET object', self._get_object, object_name) - - def head_object(self, object_name): - return self._do_request('HEAD object', self._head_object, object_name) - - def delete_object(self, object_name): - return self._do_request('DELETE object', self._delete_object, object_name) - def reset_load_store_counts(self): self._load_count = 0 self._store_count = 0 diff --git a/src/allmydata/storage/backends/cloud/msazure/msazure_container.py b/src/allmydata/storage/backends/cloud/msazure/msazure_container.py index ddeb3a10..8b6c987b 100644 --- a/src/allmydata/storage/backends/cloud/msazure/msazure_container.py +++ b/src/allmydata/storage/backends/cloud/msazure/msazure_container.py @@ -22,10 +22,10 @@ from twisted.web.http_headers import Headers from twisted.web.http import datetimeToString from allmydata.storage.backends.cloud.cloud_common import IContainer, \ - ContainerItem, ContainerListing, CommonContainerMixin + ContainerItem, ContainerListing, CommonContainerMixin, HTTPClientMixin -class MSAzureStorageContainer(CommonContainerMixin): +class MSAzureStorageContainer(CommonContainerMixin, HTTPClientMixin): implements(IContainer) USER_AGENT = "Tahoe-LAFS Microsoft Azure client" @@ -35,6 +35,7 @@ class MSAzureStorageContainer(CommonContainerMixin): def __init__(self, account_name, account_key, container_name, override_reactor=None): CommonContainerMixin.__init__(self, container_name, override_reactor) + self._init_agent() self._account_name = account_name self._account_key = base64.b64decode(account_key) self.URI = "https://%s.blob.core.windows.net" % (account_name, ) diff --git a/src/allmydata/storage/backends/cloud/openstack/openstack_container.py b/src/allmydata/storage/backends/cloud/openstack/openstack_container.py index e0229958..6e8c323b 100644 --- a/src/allmydata/storage/backends/cloud/openstack/openstack_container.py +++ b/src/allmydata/storage/backends/cloud/openstack/openstack_container.py @@ -235,15 +235,15 @@ class AuthenticationClient(HTTPClientMixin): self._delayed.cancel() -class OpenStackContainer(CommonContainerMixin): +class OpenStackContainer(CommonContainerMixin, HTTPClientMixin): implements(IContainer) USER_AGENT = "Tahoe-LAFS OpenStack storage client" def __init__(self, auth_client, container_name, override_reactor=None): CommonContainerMixin.__init__(self, container_name, override_reactor) + self._init_agent() self._auth_client = auth_client - self.ServiceError = CloudServiceError def _react_to_error(self, response_code): if response_code == UNAUTHORIZED: diff --git a/src/allmydata/storage/backends/cloud/s3/s3_container.py b/src/allmydata/storage/backends/cloud/s3/s3_container.py index 806ae611..aa76c495 100644 --- a/src/allmydata/storage/backends/cloud/s3/s3_container.py +++ b/src/allmydata/storage/backends/cloud/s3/s3_container.py @@ -8,7 +8,7 @@ except ImportError: from allmydata.node import InvalidValueError from allmydata.storage.backends.cloud.cloud_common import IContainer, \ - ContainerRetryMixin, ContainerListMixin + CommonContainerMixin, ContainerListMixin def configure_s3_container(storedir, config): @@ -24,13 +24,15 @@ def configure_s3_container(storedir, config): return S3Container(accesskeyid, secretkey, url, container_name, usertoken, producttoken) -class S3Container(ContainerRetryMixin, ContainerListMixin): +class S3Container(ContainerListMixin, CommonContainerMixin): implements(IContainer) """ I represent a real S3 container (bucket), accessed using the txaws library. """ - def __init__(self, access_key, secret_key, url, container_name, usertoken=None, producttoken=None): + def __init__(self, access_key, secret_key, url, container_name, usertoken=None, producttoken=None, override_reactor=None): + CommonContainerMixin.__init__(container_name, override_reactor) + # We only depend on txaws when this class is actually instantiated. from txaws.credentials import AWSCredentials from txaws.service import AWSServiceEndpoint @@ -54,45 +56,36 @@ class S3Container(ContainerRetryMixin, ContainerListMixin): query_factory = make_query self.client = S3Client(creds=creds, endpoint=endpoint, query_factory=query_factory) - self.container_name = container_name self.ServiceError = S3Error - def __repr__(self): - return ("<%s %r>" % (self.__class__.__name__, self.container_name,)) - - def _strip_data(self, args): - # Retain up to two arguments (container_name and object_name) for logging. - return args[:2] + def _create(self): + return self.client.create(self._container_name) - def create(self): - return self._do_request('create bucket', self.client.create_bucket, self.container_name) + def _delete(self): + return self.client.delete(self._container_name) - def delete(self): - return self._do_request('delete bucket', self.client.delete_bucket, self.container_name) + def list_some_objects(self, **kwargs): + return self._do_request('list objects', self._list_some_objects, **kwargs) - def _get_bucket(self, container_name, **kwargs): - d = self.client.get_bucket(container_name, **kwargs) + def _list_some_objects(self, **kwargs): + d = self.client.get_bucket(self._container_name, **kwargs) def _err(f): f.trap(ParseError) raise self.ServiceError("", 500, "list objects: response body is not valid XML (possibly empty)\n" + f) d.addErrback(_err) return d - def list_some_objects(self, **kwargs): - return self._do_request('list objects', self._get_bucket, self.container_name, **kwargs) - - def put_object(self, object_name, data, content_type='application/octet-stream', metadata={}): - return self._do_request('PUT object', self.client.put_object, self.container_name, - object_name, data, content_type, metadata) + def _put_object(self, object_name, data, content_type='application/octet-stream', metadata={}): + return self.client.put_object(self._container_name, object_name, data, content_type, metadata) - def get_object(self, object_name): - return self._do_request('GET object', self.client.get_object, self.container_name, object_name) + def _get_object(self, object_name): + return self.client.get_object(self._container_name, object_name) - def head_object(self, object_name): - return self._do_request('HEAD object', self.client.head_object, self.container_name, object_name) + def _head_object(self, object_name): + return self.client.head_object(self._container_name, object_name) - def delete_object(self, object_name): - return self._do_request('DELETE object', self.client.delete_object, self.container_name, object_name) + def _delete_object(self, object_name): + return self.client.delete_object(self._container_name, object_name) def put_policy(self, policy): """ @@ -100,17 +93,17 @@ class S3Container(ContainerRetryMixin, ContainerListMixin): """ query = self.client.query_factory( action='PUT', creds=self.client.creds, endpoint=self.client.endpoint, - bucket=self.container_name, object_name='?policy', data=policy) + bucket=self._container_name, object_name='?policy', data=policy) return self._do_request('PUT policy', query.submit) def get_policy(self): query = self.client.query_factory( action='GET', creds=self.client.creds, endpoint=self.client.endpoint, - bucket=self.container_name, object_name='?policy') + bucket=self._container_name, object_name='?policy') return self._do_request('GET policy', query.submit) def delete_policy(self): query = self.client.query_factory( action='DELETE', creds=self.client.creds, endpoint=self.client.endpoint, - bucket=self.container_name, object_name='?policy') + bucket=self._container_name, object_name='?policy') return self._do_request('DELETE policy', query.submit) diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index d8d0ed54..815355b6 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -794,16 +794,15 @@ class CloudStorageBackendMixin(object): return d -class ContainerRetryTests(unittest.TestCase, CloudStorageBackendMixin): +class CommonContainerTests(unittest.TestCase, CloudStorageBackendMixin): """ - Tests for ContainerRetryMixin. + Tests for CommonContainerMixin. """ def setUp(self): - from allmydata.storage.backends.cloud.cloud_common import ContainerRetryMixin + from allmydata.storage.backends.cloud.cloud_common import CommonContainerMixin self.reactor = Clock() - self.container = ContainerRetryMixin() - self.container._reactor = self.reactor - self.container.ServiceError = CloudServiceError + self.container = CommonContainerMixin("container", self.reactor) + # We don't just use mock.Mock, but do this silly thing so we can use # create_autospec, because create_autospec is the only safe way to use # mock. -- 2.45.2