from twisted.internet import defer, reactor, task
from twisted.python.failure import Failure
+from twisted.web.error import Error
from zope.interface import Interface, implements
from allmydata.interfaces import IShareBase
pass
+class CloudServiceError(Error):
+ """
+ A error class similar to txaws' S3Error.
+ """
+ def __init__(self, xml_bytes, status, message=None, response=None, request_id="", host_id=""):
+ Error.__init__(self, status, message, response)
+ self.original = xml_bytes
+ self.status = str(status)
+ self.message = str(message)
+ self.request_id = request_id
+ self.host_id = host_id
+
+ def get_error_code(self):
+ return self.status
+
+ def get_error_message(self):
+ return self.message
+
+ def parse(self, xml_bytes=""):
+ raise NotImplementedError
+
+ def has_error(self, errorString):
+ raise NotImplementedError
+
+ def get_error_codes(self):
+ raise NotImplementedError
+
+ def get_error_messages(self):
+ raise NotImplementedError
+
+
+# Originally from txaws.s3.model (under different class names), which was under the MIT / Expat licence.
+
+class ContainerItem(object):
+ """
+ An item in a listing of cloud objects.
+ """
+ def __init__(self, key, modification_date, etag, size, storage_class,
+ owner=None):
+ self.key = key
+ self.modification_date = modification_date
+ self.etag = etag
+ self.size = size
+ self.storage_class = storage_class
+ self.owner = owner
+
+ def __repr__(self):
+ return "<ContainerItem %r>" % ({
+ "key": self.key,
+ "modification_date": self.modification_date,
+ "etag": self.etag,
+ "size": self.size,
+ "storage_class": self.storage_class,
+ "owner": self.owner,
+ },)
+
+
+class ContainerListing(object):
+ def __init__(self, name, prefix, marker, max_keys, is_truncated,
+ contents=None, common_prefixes=None):
+ self.name = name
+ self.prefix = prefix
+ self.marker = marker
+ self.max_keys = max_keys
+ self.is_truncated = is_truncated
+ self.contents = contents
+ self.common_prefixes = common_prefixes
+
+ def __repr__(self):
+ return "<ContainerListing %r>" % ({
+ "name": self.name,
+ "prefix": self.prefix,
+ "marker": self.marker,
+ "max_keys": self.max_keys,
+ "is_truncated": self.is_truncated,
+ "contents": self.contents,
+ "common_prefixes": self.common_prefixes,
+ })
+
+
BACKOFF_SECONDS_FOR_5XX = (0, 2, 10)
import os.path
from twisted.internet import defer
-from twisted.web.error import Error
from allmydata.util.deferredutil import async_iterate
from zope.interface import implements
from allmydata.storage.backends.cloud.cloud_common import IContainer, \
+ CloudServiceError, ContainerItem, ContainerListing, \
ContainerRetryMixin, ContainerListMixin
from allmydata.util.time_format import iso_utc
from allmydata.util import fileutil
def __init__(self, storagedir):
self._storagedir = storagedir
self.container_name = "MockContainer"
- self.ServiceError = MockServiceError
+ self.ServiceError = CloudServiceError
self._load_count = 0
self._store_count = 0
# This method is also called by tests.
sharefile = os.path.join(self._storagedir, object_name)
if must_exist and not os.path.exists(sharefile):
- raise MockServiceError("", 404, "not found")
+ raise self.ServiceError("", 404, "not found")
return sharefile
def _put_object(self, ign, object_name, data, content_type, metadata):
def get_store_count(self):
return self._store_count
-
-
-class MockServiceError(Error):
- """
- A error class similar to txaws' S3Error.
- """
- def __init__(self, xml_bytes, status, message=None, response=None, request_id="", host_id=""):
- Error.__init__(self, status, message, response)
- self.original = xml_bytes
- self.status = str(status)
- self.message = str(message)
- self.request_id = request_id
- self.host_id = host_id
-
- def get_error_code(self):
- return self.status
-
- def get_error_message(self):
- return self.message
-
- def parse(self, xml_bytes=""):
- raise NotImplementedError
-
- def has_error(self, errorString):
- raise NotImplementedError
-
- def get_error_codes(self):
- raise NotImplementedError
-
- def get_error_messages(self):
- raise NotImplementedError
-
-
-# Originally from txaws.s3.model (under different class names), which was under the MIT / Expat licence.
-
-class ContainerItem(object):
- """
- An item in a listing of cloud objects.
- """
- def __init__(self, key, modification_date, etag, size, storage_class,
- owner=None):
- self.key = key
- self.modification_date = modification_date
- self.etag = etag
- self.size = size
- self.storage_class = storage_class
- self.owner = owner
-
- def __repr__(self):
- return "<ContainerItem %r>" % ({
- "key": self.key,
- "modification_date": self.modification_date,
- "etag": self.etag,
- "size": self.size,
- "storage_class": self.storage_class,
- "owner": self.owner,
- },)
-
-
-class ContainerListing(object):
- def __init__(self, name, prefix, marker, max_keys, is_truncated,
- contents=None, common_prefixes=None):
- self.name = name
- self.prefix = prefix
- self.marker = marker
- self.max_keys = max_keys
- self.is_truncated = is_truncated
- self.contents = contents
- self.common_prefixes = common_prefixes
-
- def __repr__(self):
- return "<ContainerListing %r>" % ({
- "name": self.name,
- "prefix": self.prefix,
- "marker": self.marker,
- "max_keys": self.max_keys,
- "is_truncated": self.is_truncated,
- "contents": self.contents,
- "common_prefixes": self.common_prefixes,
- })
from allmydata.util.assertutil import _assert
from allmydata.node import InvalidValueError
from allmydata.storage.backends.cloud.cloud_common import IContainer, \
- ContainerRetryMixin
-
-# move this
-from allmydata.storage.backends.cloud.mock_cloud import ContainerItem, ContainerListing
+ CloudServiceError, ContainerItem, ContainerListing, ContainerRetryMixin
# Enabling this will cause secrets to be logged.
self._delayed.cancel()
-class OpenStackError(Exception):
- pass
-
-
class Discard(Protocol):
# see http://twistedmatrix.com/trac/ticket/5488
def makeConnection(self, producer):
if reason.check(ResponseDone):
eventually_callback(self._done)("".join(self._data))
else:
- def _failed(): raise OpenStackError(reason.getErrorMessage())
+ def _failed(): raise CloudServiceError(reason.getErrorMessage())
eventually_errback(self._done)(defer.execute(_failed))
def when_done(self):
self._container_name = container_name
self._reactor = override_reactor or reactor
self._agent = Agent(self._reactor)
- self.ServiceError = OpenStackError
+ self.ServiceError = CloudServiceError
def __repr__(self):
return ("<%s %r>" % (self.__class__.__name__, self._container_name,))
log.msg(format="OpenStack list GET response: %(code)d %(phrase)s",
code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
if response.code < 200 or response.code >= 300:
- raise OpenStackError("unexpected response code %r %s" % (response.code, response.phrase),
- response.code, response.headers)
+ raise self.ServiceError("unexpected response code %r %s" % (response.code, response.phrase),
+ response.code, response.headers)
collector = DataCollector()
response.deliverBody(collector)
etag = item['hash']
storage_class = 'STANDARD'
except KeyError, e:
- raise OpenStackError(str(e))
+ raise self.ServiceError(str(e))
else:
return ContainerItem(key, modification_date, etag, size, storage_class)
log.msg(format="OpenStack PUT response: %(code)d %(phrase)s",
code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
if response.code < 200 or response.code >= 300:
- raise OpenStackError("unexpected response code %r %s" % (response.code, response.phrase),
- response.code, response.headers)
+ raise self.ServiceError("unexpected response code %r %s" % (response.code, response.phrase),
+ response.code, response.headers)
response.deliverBody(Discard())
d.addCallback(_got_put_response)
return d
log.msg(format="OpenStack GET response: %(code)d %(phrase)s",
code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
if response.code < 200 or response.code >= 300:
- raise OpenStackError("unexpected response code %r %s" % (response.code, response.phrase),
- response.code, response.headers)
+ raise self.ServiceError("unexpected response code %r %s" % (response.code, response.phrase),
+ response.code, response.headers)
collector = DataCollector()
response.deliverBody(collector)
log.msg(format="OpenStack DELETE response: %(code)d %(phrase)s",
code=response.code, phrase=response.phrase, level=log.OPERATIONAL)
if response.code < 200 or response.code >= 300:
- raise OpenStackError("unexpected response code %r %s" % (response.code, response.phrase),
- response.code, response.headers)
+ raise self.ServiceError("unexpected response code %r %s" % (response.code, response.phrase),
+ response.code, response.headers)
response.deliverBody(Discard())
d.addCallback(_got_delete_response)
return d
create_immutable_disk_share, ImmutableDiskShare
from allmydata.storage.backends.disk.mutable import create_mutable_disk_share, MutableDiskShare
from allmydata.storage.backends.cloud.cloud_backend import CloudBackend
-from allmydata.storage.backends.cloud import mock_cloud, cloud_common
-from allmydata.storage.backends.cloud.mock_cloud import MockContainer, MockServiceError, \
+from allmydata.storage.backends.cloud.cloud_common import CloudError, CloudServiceError, \
ContainerItem, ContainerListing
+from allmydata.storage.backends.cloud import mock_cloud, cloud_common
+from allmydata.storage.backends.cloud.mock_cloud import MockContainer
from allmydata.storage.backends.cloud.openstack import openstack_container
from allmydata.storage.bucket import BucketWriter, BucketReader
from allmydata.storage.common import DataTooLargeError, storage_index_to_dir
def call_put_object(self, ign, object_name, data, content_type=None, metadata={}):
t['count'] += 1
if t['count'] <= failure_count:
- return defer.fail(MockServiceError("XML", 500, "Internal error", "response"))
+ return defer.fail(CloudServiceError("XML", 500, "Internal error", "response"))
else:
return old_put_object(self, ign, object_name, data, content_type=content_type, metadata=metadata)
self.patch(MockContainer, '_put_object', call_put_object)