From: Brian Warner Date: Sat, 31 Mar 2012 00:29:06 +0000 (-0700) Subject: test_introducer.SystemTest: fix race condition X-Git-Url: https://git.rkrishnan.org/simplejson/components/?a=commitdiff_plain;h=24812905a1da3ada28c0177f755bff62f3228123;p=tahoe-lafs%2Ftahoe-lafs.git test_introducer.SystemTest: fix race condition SystemTest has a couple of different phases, separated by a poller which waits for everything to be idle (all messages delivered, none in flight). It does this by watching some internal "_debug_outstanding" counters in the server and in each client, and waiting for them to hit zero. Just before the last phase, we replace the server with a new one (to make sure clients re-send their messages properly). Unfortunately, the polling function closed over the variable holding the original server, and didn't see the replacement. It kept polling the old server, and failed to notice the outstanding messages for the new server. The last phase of the test (check3) was started too early, which failed (since some messages had not yet been delivered), and then exploded in a flurry of dirty-reactor errors (because some messages were delivered after test shutdown). This replaces the closed-over-variable with a "self.the_introducer", which seems to fix the race. One additional place to look at in the future: the client announcement-receive path (remote_announce) uses an eventually(). If the message has been received and the eventual-send posted (but not yet executed) when the poller sees it, the poller might erroneously conclude that the client is idle and cause the same problem as above. To fix this, the poller (probably all pollers) could be enhanced to do a flushEventualQueue before querying the are-we-done-yet predicate function. --- diff --git a/src/allmydata/test/test_introducer.py b/src/allmydata/test/test_introducer.py index b8dd50b3..841771ef 100644 --- a/src/allmydata/test/test_introducer.py +++ b/src/allmydata/test/test_introducer.py @@ -387,6 +387,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase): received_announcements = {} subscribing_clients = [] publishing_clients = [] + self.the_introducer = introducer privkeys = {} expected_announcements = [0 for c in range(NUM_CLIENTS)] @@ -483,7 +484,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase): for c in subscribing_clients + publishing_clients: if c._debug_outstanding: return False - if introducer._debug_outstanding: + if self.the_introducer._debug_outstanding: return False return True return self.poll(_idle) @@ -495,7 +496,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase): def _check1(res): log.msg("doing _check1") - dc = introducer._debug_counts + dc = self.the_introducer._debug_counts if server_version == V1: # each storage server publishes a record, and (after its # 'subscribe' has been ACKed) also publishes a "stub_client". @@ -594,11 +595,11 @@ class SystemTest(SystemTestMixin, unittest.TestCase): c = subscribing_clients[i] for k in c._debug_counts: c._debug_counts[k] = 0 - for k in introducer._debug_counts: - introducer._debug_counts[k] = 0 + for k in self.the_introducer._debug_counts: + self.the_introducer._debug_counts[k] = 0 expected_announcements[i] += 1 # new 'storage' for everyone self.create_tub(self.central_portnum) - newfurl = self.central_tub.registerReference(introducer, + newfurl = self.central_tub.registerReference(self.the_introducer, furlFile=iff) assert newfurl == self.introducer_furl d.addCallback(_restart_introducer_tub) @@ -614,7 +615,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase): log.msg("doing _check2") # assert that the introducer sent out new messages, one per # subscriber - dc = introducer._debug_counts + dc = self.the_introducer._debug_counts self.failUnlessEqual(dc["outbound_announcements"], NUM_STORAGE*NUM_CLIENTS) self.failUnless(dc["outbound_message"] > 0) @@ -652,7 +653,8 @@ class SystemTest(SystemTestMixin, unittest.TestCase): introducer = old.IntroducerService_v1() else: introducer = IntroducerService() - newfurl = self.central_tub.registerReference(introducer, + self.the_introducer = introducer + newfurl = self.central_tub.registerReference(self.the_introducer, furlFile=iff) assert newfurl == self.introducer_furl d.addCallback(_restart_introducer) @@ -663,7 +665,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase): def _check3(res): log.msg("doing _check3") - dc = introducer._debug_counts + dc = self.the_introducer._debug_counts self.failUnlessEqual(dc["outbound_announcements"], NUM_STORAGE*NUM_CLIENTS) self.failUnless(dc["outbound_message"] > 0)