from twisted.web.client import FileBodyProducer, ResponseDone, Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from twisted.internet.protocol import Protocol
+from twisted.internet.error import TimeoutError
from zope.interface import Interface, implements
from allmydata.interfaces import IShareBase
def _retry(f):
d2 = self._handle_error(f, 1, None, description, operation, *args, **kwargs)
def _trigger_incident(res):
- log.msg(format="error(s) on cloud container operation: %(description)s %(arguments)s %(kwargs)s",
- arguments=args[:2], kwargs=kwargs, description=description,
+ log.msg(format="error(s) on cloud container operation: %(description)s %(arguments)s %(kwargs)s %(res)s",
+ arguments=args[:2], kwargs=kwargs, description=description, res=res,
level=log.WEIRD)
return res
d2.addBoth(_trigger_incident)
return d
def _handle_error(self, f, trynum, first_err_and_tb, description, operation, *args, **kwargs):
- f.trap(self.ServiceError)
+ f.trap(self.ServiceError, TimeoutError)
# Don't use f.getTracebackObject() since a fake traceback will not do for the 3-arg form of 'raise'.
# tb can be None (which is acceptable for 3-arg raise) if we don't have a traceback.
err = CloudError(msg, *fargs)
# This should not trigger an incident; we want to do that at the end.
- log.msg(format="try %(trynum)d failed: %(description)s %(arguments)s %(kwargs)s %(fargs)s",
- trynum=trynum, arguments=args_without_data, kwargs=kwargs, description=description, fargs=repr(fargs),
+ log.msg(format="try %(trynum)d failed: %(description)s %(arguments)s %(kwargs)s %(ftype)s %(fargs)s",
+ trynum=trynum, arguments=args_without_data, kwargs=kwargs, description=description, ftype=str(f.value.__class__), fargs=repr(fargs),
level=log.INFREQUENT)
if first_err_and_tb is None:
(first_err, first_tb) = first_err_and_tb
raise first_err.__class__, first_err, first_tb
- fargs = f.value.args
- if len(fargs) > 0:
- retry = self._react_to_error(int(fargs[0]))
- if retry:
- d = task.deferLater(self._reactor, BACKOFF_SECONDS_BEFORE_RETRY[trynum-1], operation, *args, **kwargs)
- d.addErrback(self._handle_error, trynum+1, first_err_and_tb, description, operation, *args, **kwargs)
- return d
+ retry = True
+ if f.check(self.ServiceError):
+ fargs = f.value.args
+ if len(fargs) > 0:
+ retry = self._react_to_error(int(fargs[0]))
+ else:
+ retry = False
+
+ if retry:
+ d = task.deferLater(self._reactor, BACKOFF_SECONDS_BEFORE_RETRY[trynum-1], operation, *args, **kwargs)
+ d.addErrback(self._handle_error, trynum+1, first_err_and_tb, description, operation, *args, **kwargs)
+ return d
# If we get an error response for which _react_to_error says we should not retry,
# raise that error even if the request was itself a retry.
def __init__(self, container_name, override_reactor=None):
self._container_name = container_name
self._reactor = override_reactor or reactor
- self._agent = Agent(self._reactor, pool=HTTPConnectionPool(self._reactor))
+ pool = HTTPConnectionPool(self._reactor)
+ pool.maxPersistentPerHost = 20
+ self._agent = Agent(self._reactor, connectTimeout=10, pool=pool)
self.ServiceError = CloudServiceError
def __repr__(self):
protocol.connectionLost(Failure(ResponseDone()))
class MockAgent(object):
- def __init__(mock_self, reactor, pool=None):
+ def __init__(mock_self, reactor, pool=None, connectTimeout=None):
pass
def request(mock_self, method, url, headers, bodyProducer=None):
return d
+class ContainerRetryTests(unittest.TestCase, CloudStorageBackendMixin):
+ """
+ Tests for ContainerRetryMixin.
+ """
+ def setUp(self):
+ from allmydata.storage.backends.cloud.cloud_common import ContainerRetryMixin
+ self.reactor = Clock()
+ self.container = ContainerRetryMixin()
+ self.container._reactor = self.reactor
+ self.container.ServiceError = CloudServiceError
+ # We don't just use mock.Mock, but do this silly thing so we can use
+ # create_autospec, because create_autospec is the only safe way to use
+ # mock.
+ self.container._http_request = (lambda description, method, url, headers,
+ body=None, need_response_body=False: None)
+
+ def test_retry_response_code(self):
+ """
+ If an HTTP response code is server error or an authentication error,
+ the request will try again after a delay.
+ """
+ first, second = defer.Deferred(), defer.Deferred()
+ self.container._http_request = mock.create_autospec(
+ self.container._http_request, side_effect=[first, second])
+ result = []
+ self.container._do_request("test", self.container._http_request,
+ "test", "GET", "http://example", {}, body=None,
+ need_response_body=True).addCallback(result.append)
+ # No response from first request yet:
+ self.assertFalse(result)
+ self.assertEqual(self.container._http_request.call_count, 1)
+ self.container._http_request.assert_called_with(
+ "test", "GET", "http://example", {},
+ body=None, need_response_body=True)
+
+ # First response fails:
+ first.errback(CloudServiceError(None, 500))
+ self.assertFalse(result, result)
+ self.assertEqual(self.container._http_request.call_count, 1)
+ self.reactor.advance(0.1)
+ self.assertEqual(self.container._http_request.call_count, 2)
+ self.container._http_request.assert_called_with(
+ "test", "GET", "http://example", {},
+ body=None, need_response_body=True)
+
+ # Second response succeeds:
+ done = object()
+ second.callback(done)
+ self.assertEqual(result, [done])
+
+ def test_retry_timeout(self):
+ """
+ If an HTTP connection fails with a timeout, retry.
+ """
+ first, second = defer.Deferred(), defer.Deferred()
+ self.container._http_request = mock.create_autospec(
+ self.container._http_request, side_effect=[first, second])
+ result = []
+ self.container._do_request("test", self.container._http_request,
+ "test", "GET", "http://example", {}, body=None,
+ need_response_body=True).addCallback(result.append)
+
+ # No response from first request yet:
+ self.assertFalse(result)
+ self.assertEqual(self.container._http_request.call_count, 1)
+ self.container._http_request.assert_called_with(
+ "test", "GET", "http://example", {},
+ body=None, need_response_body=True)
+
+ # First response fails:
+ from twisted.internet.error import TimeoutError
+ first.errback(TimeoutError())
+ self.assertFalse(result, result)
+ self.assertEqual(self.container._http_request.call_count, 1)
+ self.reactor.advance(0.1)
+ self.assertEqual(self.container._http_request.call_count, 2)
+ self.container._http_request.assert_called_with(
+ "test", "GET", "http://example", {},
+ body=None, need_response_body=True)
+
+
class GoogleStorageBackend(unittest.TestCase, CloudStorageBackendMixin):
"""
Tests for the Google Storage API container.
http_response.callback((self.Response(200), None))
self.assertTrue(done)
- def test_retry(self):
- """
- If an HTTP response code is server error or an authentication error,
- the request will try again after a delay.
- """
- # XXX should maybe be refactored into test for the common base class
- # that implments retries...
- first, second = defer.Deferred(), defer.Deferred()
- self.container._http_request = mock.create_autospec(
- self.container._http_request, side_effect=[first, second])
- result = []
- self.container._do_request("test", self.container._http_request,
- "test", "GET", "http://example", {}, body=None,
- need_response_body=True).addCallback(result.append)
- # No response from first request yet:
- self.assertFalse(result)
- self.assertEqual(self.container._http_request.call_count, 1)
- self.container._http_request.assert_called_with(
- "test", "GET", "http://example", {},
- body=None, need_response_body=True)
-
- # First response fails:
- first.errback(CloudServiceError(None, 500))
- self.assertFalse(result, result)
- self.assertEqual(self.container._http_request.call_count, 1)
- self.reactor.advance(0.1)
- self.assertEqual(self.container._http_request.call_count, 2)
- self.container._http_request.assert_called_with(
- "test", "GET", "http://example", {},
- body=None, need_response_body=True)
-
- # Second response succeeds:
- done = object()
- second.callback(done)
- self.assertEqual(result, [done])
-
class ServerMixin:
def allocate(self, account, storage_index, sharenums, size, canary=None):