BACKOFF_SECONDS_BEFORE_RETRY = (0, 2, 10)
-class ContainerRetryMixin:
+class CommonContainerMixin:
"""
+ Base class for cloud storage providers with similar APIs.
+
I provide a helper method for performing an operation on a cloud container that will retry up to
len(BACKOFF_SECONDS_FOR_RETRY) times (not including the initial try). If the initial try fails, a
single incident will be triggered after the operation has succeeded or failed.
Returns True if the error should be retried. May perform side effects before the retry.
"""
+ def __init__(self, container_name, override_reactor=None):
+ self._container_name = container_name
+ self._reactor = override_reactor or reactor
+ self.ServiceError = CloudServiceError
+
+ def __repr__(self):
+ return ("<%s %r>" % (self.__class__.__name__, self._container_name,))
+
+ def _make_container_url(self, public_storage_url):
+ return "%s/%s" % (public_storage_url, urllib.quote(self._container_name, safe=''))
+
+ def _make_object_url(self, public_storage_url, object_name):
+ return "%s/%s/%s" % (public_storage_url, urllib.quote(self._container_name, safe=''),
+ urllib.quote(object_name))
+
def _react_to_error(self, response_code):
"""
The default policy is to retry on 5xx errors.
return response_code >= 500 and response_code < 600
def _strip_data(self, args):
- """
- By default retain only one argument (object_name) for logging.
- Subclasses should override this if more than one argument is safe to log
- (we want to avoid logging data).
- """
+ # Retain only one argument, object_name, for logging (we want to avoid logging data).
return args[:1]
def _do_request(self, description, operation, *args, **kwargs):
log.msg("Giving up, no retry for %s" % (err,))
raise err.__class__, err, tb
+ def create(self):
+ return self._do_request('create container', self._create)
+
+ def delete(self):
+ return self._do_request('delete container', self._delete)
+
+ def list_objects(self, prefix=''):
+ return self._do_request('list objects', self._list_objects, prefix)
+
+ def put_object(self, object_name, data, content_type='application/octet-stream', metadata={}):
+ return self._do_request('PUT object', self._put_object, object_name, data, content_type, metadata)
+
+ def get_object(self, object_name):
+ return self._do_request('GET object', self._get_object, object_name)
+
+ def head_object(self, object_name):
+ return self._do_request('HEAD object', self._head_object, object_name)
+
+ def delete_object(self, object_name):
+ return self._do_request('DELETE object', self._delete_object, object_name)
+
def concat(seqs):
"""
ServiceError:
The error class to trap (CloudServiceError or similar).
"""
+
+ def _init_agent(self):
+ pool = HTTPConnectionPool(self._reactor)
+ pool.maxPersistentPerHost = 20
+ self._agent = Agent(self._reactor, connectTimeout=10, pool=pool)
+
def _http_request(self, what, method, url, request_headers, body=None, need_response_body=False):
# Agent.request adds a Host header automatically based on the URL.
request_headers['User-Agent'] = [self.USER_AGENT]
raise self.ServiceError(None, response.code,
message="missing response header %r" % (name,))
return hs[0]
-
-
-
-class CommonContainerMixin(HTTPClientMixin, ContainerRetryMixin):
- """
- Base class for cloud storage providers with similar APIs.
-
- In particular, OpenStack and Google Storage are very similar (presumably
- since they both copy S3).
- """
-
- def __init__(self, container_name, override_reactor=None):
- self._container_name = container_name
-
- # Maybe this should be in HTTPClientMixin?
- self._reactor = override_reactor or reactor
- pool = HTTPConnectionPool(self._reactor)
- pool.maxPersistentPerHost = 20
- self._agent = Agent(self._reactor, connectTimeout=10, pool=pool)
- self.ServiceError = CloudServiceError
-
- def __repr__(self):
- return ("<%s %r>" % (self.__class__.__name__, self._container_name,))
-
- def _make_container_url(self, public_storage_url):
- return "%s/%s" % (public_storage_url, urllib.quote(self._container_name, safe=''))
-
- def _make_object_url(self, public_storage_url, object_name):
- return "%s/%s/%s" % (public_storage_url, urllib.quote(self._container_name, safe=''),
- urllib.quote(object_name))
-
- # Consider moving these to ContainerRetryMixin.
-
- def create(self):
- return self._do_request('create container', self._create)
-
- def delete(self):
- return self._do_request('delete container', self._delete)
-
- def list_objects(self, prefix=''):
- return self._do_request('list objects', self._list_objects, prefix)
-
- def put_object(self, object_name, data, content_type='application/octet-stream', metadata={}):
- return self._do_request('PUT object', self._put_object, object_name, data, content_type, metadata)
-
- def get_object(self, object_name):
- return self._do_request('GET object', self._get_object, object_name)
-
- def head_object(self, object_name):
- return self._do_request('HEAD object', self._head_object, object_name)
-
- def delete_object(self, object_name):
- return self._do_request('DELETE object', self._delete_object, object_name)
from allmydata.util import log
from allmydata.storage.backends.cloud.cloud_common import IContainer, \
- ContainerItem, ContainerListing, CommonContainerMixin
+ ContainerItem, ContainerListing, CommonContainerMixin, HTTPClientMixin
class AuthenticationClient(object):
return d
-class GoogleStorageContainer(CommonContainerMixin):
+class GoogleStorageContainer(CommonContainerMixin, HTTPClientMixin):
implements(IContainer)
USER_AGENT = "Tahoe-LAFS Google Storage client"
def __init__(self, auth_client, project_id, bucket_name, override_reactor=None):
CommonContainerMixin.__init__(self, bucket_name, override_reactor)
+ self._init_agent()
self._auth_client = auth_client
self._project_id = project_id # Only need for bucket creation/deletion
from allmydata.util.assertutil import _assert
from allmydata.storage.backends.cloud.cloud_common import IContainer, \
CloudServiceError, ContainerItem, ContainerListing, \
- ContainerRetryMixin, ContainerListMixin
+ CommonContainerMixin, ContainerListMixin
from allmydata.util.time_format import iso_utc
from allmydata.util import fileutil
return defer.execute(_not_implemented)
-class MockContainer(ContainerRetryMixin, ContainerListMixin):
+class MockContainer(ContainerListMixin, CommonContainerMixin):
implements(IContainer)
"""
I represent a mock cloud container that stores its data in the local filesystem.
sharefile = os.path.join(sidir, shnumstr)
yield (sharefile, "%s/%s" % (sikey, shnumstr))
+ def list_some_objects(self, **kwargs):
+ return self._do_request('list objects', self._list_some_objects, **kwargs)
+
def _list_some_objects(self, prefix='', marker=None, max_keys=None):
if max_keys is None:
max_keys = MAX_KEYS
fileutil.remove(self._get_path(object_name, must_exist=True))
return defer.succeed(None)
- # methods that use error handling from ContainerRetryMixin
-
- def create(self):
- return self._do_request('create bucket', self._create)
-
- def delete(self):
- return self._do_request('delete bucket', self._delete)
-
- def list_some_objects(self, **kwargs):
- return self._do_request('list objects', self._list_some_objects, **kwargs)
-
- def put_object(self, object_name, data, content_type='application/octet-stream', metadata={}):
- return self._do_request('PUT object', self._put_object, object_name, data, content_type, metadata)
-
- def get_object(self, object_name):
- return self._do_request('GET object', self._get_object, object_name)
-
- def head_object(self, object_name):
- return self._do_request('HEAD object', self._head_object, object_name)
-
- def delete_object(self, object_name):
- return self._do_request('DELETE object', self._delete_object, object_name)
-
def reset_load_store_counts(self):
self._load_count = 0
self._store_count = 0
from twisted.web.http import datetimeToString
from allmydata.storage.backends.cloud.cloud_common import IContainer, \
- ContainerItem, ContainerListing, CommonContainerMixin
+ ContainerItem, ContainerListing, CommonContainerMixin, HTTPClientMixin
-class MSAzureStorageContainer(CommonContainerMixin):
+class MSAzureStorageContainer(CommonContainerMixin, HTTPClientMixin):
implements(IContainer)
USER_AGENT = "Tahoe-LAFS Microsoft Azure client"
def __init__(self, account_name, account_key, container_name,
override_reactor=None):
CommonContainerMixin.__init__(self, container_name, override_reactor)
+ self._init_agent()
self._account_name = account_name
self._account_key = base64.b64decode(account_key)
self.URI = "https://%s.blob.core.windows.net" % (account_name, )
self._delayed.cancel()
-class OpenStackContainer(CommonContainerMixin):
+class OpenStackContainer(CommonContainerMixin, HTTPClientMixin):
implements(IContainer)
USER_AGENT = "Tahoe-LAFS OpenStack storage client"
def __init__(self, auth_client, container_name, override_reactor=None):
CommonContainerMixin.__init__(self, container_name, override_reactor)
+ self._init_agent()
self._auth_client = auth_client
- self.ServiceError = CloudServiceError
def _react_to_error(self, response_code):
if response_code == UNAUTHORIZED:
from allmydata.node import InvalidValueError
from allmydata.storage.backends.cloud.cloud_common import IContainer, \
- ContainerRetryMixin, ContainerListMixin
+ CommonContainerMixin, ContainerListMixin
def configure_s3_container(storedir, config):
return S3Container(accesskeyid, secretkey, url, container_name, usertoken, producttoken)
-class S3Container(ContainerRetryMixin, ContainerListMixin):
+class S3Container(ContainerListMixin, CommonContainerMixin):
implements(IContainer)
"""
I represent a real S3 container (bucket), accessed using the txaws library.
"""
- def __init__(self, access_key, secret_key, url, container_name, usertoken=None, producttoken=None):
+ def __init__(self, access_key, secret_key, url, container_name, usertoken=None, producttoken=None, override_reactor=None):
+ CommonContainerMixin.__init__(container_name, override_reactor)
+
# We only depend on txaws when this class is actually instantiated.
from txaws.credentials import AWSCredentials
from txaws.service import AWSServiceEndpoint
query_factory = make_query
self.client = S3Client(creds=creds, endpoint=endpoint, query_factory=query_factory)
- self.container_name = container_name
self.ServiceError = S3Error
- def __repr__(self):
- return ("<%s %r>" % (self.__class__.__name__, self.container_name,))
-
- def _strip_data(self, args):
- # Retain up to two arguments (container_name and object_name) for logging.
- return args[:2]
+ def _create(self):
+ return self.client.create(self._container_name)
- def create(self):
- return self._do_request('create bucket', self.client.create_bucket, self.container_name)
+ def _delete(self):
+ return self.client.delete(self._container_name)
- def delete(self):
- return self._do_request('delete bucket', self.client.delete_bucket, self.container_name)
+ def list_some_objects(self, **kwargs):
+ return self._do_request('list objects', self._list_some_objects, **kwargs)
- def _get_bucket(self, container_name, **kwargs):
- d = self.client.get_bucket(container_name, **kwargs)
+ def _list_some_objects(self, **kwargs):
+ d = self.client.get_bucket(self._container_name, **kwargs)
def _err(f):
f.trap(ParseError)
raise self.ServiceError("", 500, "list objects: response body is not valid XML (possibly empty)\n" + f)
d.addErrback(_err)
return d
- def list_some_objects(self, **kwargs):
- return self._do_request('list objects', self._get_bucket, self.container_name, **kwargs)
-
- def put_object(self, object_name, data, content_type='application/octet-stream', metadata={}):
- return self._do_request('PUT object', self.client.put_object, self.container_name,
- object_name, data, content_type, metadata)
+ def _put_object(self, object_name, data, content_type='application/octet-stream', metadata={}):
+ return self.client.put_object(self._container_name, object_name, data, content_type, metadata)
- def get_object(self, object_name):
- return self._do_request('GET object', self.client.get_object, self.container_name, object_name)
+ def _get_object(self, object_name):
+ return self.client.get_object(self._container_name, object_name)
- def head_object(self, object_name):
- return self._do_request('HEAD object', self.client.head_object, self.container_name, object_name)
+ def _head_object(self, object_name):
+ return self.client.head_object(self._container_name, object_name)
- def delete_object(self, object_name):
- return self._do_request('DELETE object', self.client.delete_object, self.container_name, object_name)
+ def _delete_object(self, object_name):
+ return self.client.delete_object(self._container_name, object_name)
def put_policy(self, policy):
"""
"""
query = self.client.query_factory(
action='PUT', creds=self.client.creds, endpoint=self.client.endpoint,
- bucket=self.container_name, object_name='?policy', data=policy)
+ bucket=self._container_name, object_name='?policy', data=policy)
return self._do_request('PUT policy', query.submit)
def get_policy(self):
query = self.client.query_factory(
action='GET', creds=self.client.creds, endpoint=self.client.endpoint,
- bucket=self.container_name, object_name='?policy')
+ bucket=self._container_name, object_name='?policy')
return self._do_request('GET policy', query.submit)
def delete_policy(self):
query = self.client.query_factory(
action='DELETE', creds=self.client.creds, endpoint=self.client.endpoint,
- bucket=self.container_name, object_name='?policy')
+ bucket=self._container_name, object_name='?policy')
return self._do_request('DELETE policy', query.submit)
return d
-class ContainerRetryTests(unittest.TestCase, CloudStorageBackendMixin):
+class CommonContainerTests(unittest.TestCase, CloudStorageBackendMixin):
"""
- Tests for ContainerRetryMixin.
+ Tests for CommonContainerMixin.
"""
def setUp(self):
- from allmydata.storage.backends.cloud.cloud_common import ContainerRetryMixin
+ from allmydata.storage.backends.cloud.cloud_common import CommonContainerMixin
self.reactor = Clock()
- self.container = ContainerRetryMixin()
- self.container._reactor = self.reactor
- self.container.ServiceError = CloudServiceError
+ self.container = CommonContainerMixin("container", self.reactor)
+
# We don't just use mock.Mock, but do this silly thing so we can use
# create_autospec, because create_autospec is the only safe way to use
# mock.