From: Daira Hopwood Date: Thu, 18 Apr 2013 18:41:09 +0000 (+0100) Subject: Retry on timeouts, and increase number of persistent HTTP connections. X-Git-Url: https://git.rkrishnan.org/Site/Content/Exhibitors/class-simplejson.JSONDecoder.html?a=commitdiff_plain;h=bfdb165fe23fe31e8a14fb4ba402222f29ef2640;p=tahoe-lafs%2Ftahoe-lafs.git Retry on timeouts, and increase number of persistent HTTP connections. Author: Itamar Turner-Trauring Signed-off-by: Daira Hopwood --- diff --git a/src/allmydata/storage/backends/cloud/cloud_common.py b/src/allmydata/storage/backends/cloud/cloud_common.py index 258caf7a..ce3d2156 100644 --- a/src/allmydata/storage/backends/cloud/cloud_common.py +++ b/src/allmydata/storage/backends/cloud/cloud_common.py @@ -9,6 +9,7 @@ from twisted.web.error import Error from twisted.web.client import FileBodyProducer, ResponseDone, Agent, HTTPConnectionPool from twisted.web.http_headers import Headers from twisted.internet.protocol import Protocol +from twisted.internet.error import TimeoutError from zope.interface import Interface, implements from allmydata.interfaces import IShareBase @@ -355,8 +356,8 @@ class ContainerRetryMixin: def _retry(f): d2 = self._handle_error(f, 1, None, description, operation, *args, **kwargs) def _trigger_incident(res): - log.msg(format="error(s) on cloud container operation: %(description)s %(arguments)s %(kwargs)s", - arguments=args[:2], kwargs=kwargs, description=description, + log.msg(format="error(s) on cloud container operation: %(description)s %(arguments)s %(kwargs)s %(res)s", + arguments=args[:2], kwargs=kwargs, description=description, res=res, level=log.WEIRD) return res d2.addBoth(_trigger_incident) @@ -365,7 +366,7 @@ class ContainerRetryMixin: return d def _handle_error(self, f, trynum, first_err_and_tb, description, operation, *args, **kwargs): - f.trap(self.ServiceError) + f.trap(self.ServiceError, TimeoutError) # Don't use f.getTracebackObject() since a fake traceback will not do for the 3-arg form of 'raise'. # tb can be None (which is acceptable for 3-arg raise) if we don't have a traceback. @@ -379,8 +380,8 @@ class ContainerRetryMixin: err = CloudError(msg, *fargs) # This should not trigger an incident; we want to do that at the end. - log.msg(format="try %(trynum)d failed: %(description)s %(arguments)s %(kwargs)s %(fargs)s", - trynum=trynum, arguments=args_without_data, kwargs=kwargs, description=description, fargs=repr(fargs), + log.msg(format="try %(trynum)d failed: %(description)s %(arguments)s %(kwargs)s %(ftype)s %(fargs)s", + trynum=trynum, arguments=args_without_data, kwargs=kwargs, description=description, ftype=str(f.value.__class__), fargs=repr(fargs), level=log.INFREQUENT) if first_err_and_tb is None: @@ -392,13 +393,18 @@ class ContainerRetryMixin: (first_err, first_tb) = first_err_and_tb raise first_err.__class__, first_err, first_tb - fargs = f.value.args - if len(fargs) > 0: - retry = self._react_to_error(int(fargs[0])) - if retry: - d = task.deferLater(self._reactor, BACKOFF_SECONDS_BEFORE_RETRY[trynum-1], operation, *args, **kwargs) - d.addErrback(self._handle_error, trynum+1, first_err_and_tb, description, operation, *args, **kwargs) - return d + retry = True + if f.check(self.ServiceError): + fargs = f.value.args + if len(fargs) > 0: + retry = self._react_to_error(int(fargs[0])) + else: + retry = False + + if retry: + d = task.deferLater(self._reactor, BACKOFF_SECONDS_BEFORE_RETRY[trynum-1], operation, *args, **kwargs) + d.addErrback(self._handle_error, trynum+1, first_err_and_tb, description, operation, *args, **kwargs) + return d # If we get an error response for which _react_to_error says we should not retry, # raise that error even if the request was itself a retry. @@ -703,7 +709,9 @@ class CommonContainerMixin(HTTPClientMixin, ContainerRetryMixin): def __init__(self, container_name, override_reactor=None): self._container_name = container_name self._reactor = override_reactor or reactor - self._agent = Agent(self._reactor, pool=HTTPConnectionPool(self._reactor)) + pool = HTTPConnectionPool(self._reactor) + pool.maxPersistentPerHost = 20 + self._agent = Agent(self._reactor, connectTimeout=10, pool=pool) self.ServiceError = CloudServiceError def __repr__(self): diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index cbeef872..5dcad03c 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -443,7 +443,7 @@ class OpenStackCloudBackend(ServiceParentMixin, WorkdirMixin, ShouldFailMixin, u protocol.connectionLost(Failure(ResponseDone())) class MockAgent(object): - def __init__(mock_self, reactor, pool=None): + def __init__(mock_self, reactor, pool=None, connectTimeout=None): pass def request(mock_self, method, url, headers, bodyProducer=None): @@ -782,6 +782,87 @@ class CloudStorageBackendMixin(object): return d +class ContainerRetryTests(unittest.TestCase, CloudStorageBackendMixin): + """ + Tests for ContainerRetryMixin. + """ + def setUp(self): + from allmydata.storage.backends.cloud.cloud_common import ContainerRetryMixin + self.reactor = Clock() + self.container = ContainerRetryMixin() + self.container._reactor = self.reactor + self.container.ServiceError = CloudServiceError + # We don't just use mock.Mock, but do this silly thing so we can use + # create_autospec, because create_autospec is the only safe way to use + # mock. + self.container._http_request = (lambda description, method, url, headers, + body=None, need_response_body=False: None) + + def test_retry_response_code(self): + """ + If an HTTP response code is server error or an authentication error, + the request will try again after a delay. + """ + first, second = defer.Deferred(), defer.Deferred() + self.container._http_request = mock.create_autospec( + self.container._http_request, side_effect=[first, second]) + result = [] + self.container._do_request("test", self.container._http_request, + "test", "GET", "http://example", {}, body=None, + need_response_body=True).addCallback(result.append) + # No response from first request yet: + self.assertFalse(result) + self.assertEqual(self.container._http_request.call_count, 1) + self.container._http_request.assert_called_with( + "test", "GET", "http://example", {}, + body=None, need_response_body=True) + + # First response fails: + first.errback(CloudServiceError(None, 500)) + self.assertFalse(result, result) + self.assertEqual(self.container._http_request.call_count, 1) + self.reactor.advance(0.1) + self.assertEqual(self.container._http_request.call_count, 2) + self.container._http_request.assert_called_with( + "test", "GET", "http://example", {}, + body=None, need_response_body=True) + + # Second response succeeds: + done = object() + second.callback(done) + self.assertEqual(result, [done]) + + def test_retry_timeout(self): + """ + If an HTTP connection fails with a timeout, retry. + """ + first, second = defer.Deferred(), defer.Deferred() + self.container._http_request = mock.create_autospec( + self.container._http_request, side_effect=[first, second]) + result = [] + self.container._do_request("test", self.container._http_request, + "test", "GET", "http://example", {}, body=None, + need_response_body=True).addCallback(result.append) + + # No response from first request yet: + self.assertFalse(result) + self.assertEqual(self.container._http_request.call_count, 1) + self.container._http_request.assert_called_with( + "test", "GET", "http://example", {}, + body=None, need_response_body=True) + + # First response fails: + from twisted.internet.error import TimeoutError + first.errback(TimeoutError()) + self.assertFalse(result, result) + self.assertEqual(self.container._http_request.call_count, 1) + self.reactor.advance(0.1) + self.assertEqual(self.container._http_request.call_count, 2) + self.container._http_request.assert_called_with( + "test", "GET", "http://example", {}, + body=None, need_response_body=True) + + class GoogleStorageBackend(unittest.TestCase, CloudStorageBackendMixin): """ Tests for the Google Storage API container. @@ -1413,42 +1494,6 @@ class MSAzureStorageBackendTests(unittest.TestCase, CloudStorageBackendMixin): http_response.callback((self.Response(200), None)) self.assertTrue(done) - def test_retry(self): - """ - If an HTTP response code is server error or an authentication error, - the request will try again after a delay. - """ - # XXX should maybe be refactored into test for the common base class - # that implments retries... - first, second = defer.Deferred(), defer.Deferred() - self.container._http_request = mock.create_autospec( - self.container._http_request, side_effect=[first, second]) - result = [] - self.container._do_request("test", self.container._http_request, - "test", "GET", "http://example", {}, body=None, - need_response_body=True).addCallback(result.append) - # No response from first request yet: - self.assertFalse(result) - self.assertEqual(self.container._http_request.call_count, 1) - self.container._http_request.assert_called_with( - "test", "GET", "http://example", {}, - body=None, need_response_body=True) - - # First response fails: - first.errback(CloudServiceError(None, 500)) - self.assertFalse(result, result) - self.assertEqual(self.container._http_request.call_count, 1) - self.reactor.advance(0.1) - self.assertEqual(self.container._http_request.call_count, 2) - self.container._http_request.assert_called_with( - "test", "GET", "http://example", {}, - body=None, need_response_body=True) - - # Second response succeeds: - done = object() - second.callback(done) - self.assertEqual(result, [done]) - class ServerMixin: def allocate(self, account, storage_index, sharenums, size, canary=None):