test_introducer.SystemTest: fix race condition
authorBrian Warner <warner@lothar.com>
Sat, 31 Mar 2012 00:29:06 +0000 (17:29 -0700)
committerBrian Warner <warner@lothar.com>
Sat, 31 Mar 2012 00:29:06 +0000 (17:29 -0700)
SystemTest has a couple of different phases, separated by a poller which
waits for everything to be idle (all messages delivered, none in flight). It
does this by watching some internal "_debug_outstanding" counters in the
server and in each client, and waiting for them to hit zero.

Just before the last phase, we replace the server with a new one (to make
sure clients re-send their messages properly). Unfortunately, the polling
function closed over the variable holding the original server, and didn't see
the replacement. It kept polling the old server, and failed to notice the
outstanding messages for the new server. The last phase of the test (check3)
was started too early, which failed (since some messages had not yet been
delivered), and then exploded in a flurry of dirty-reactor errors (because
some messages were delivered after test shutdown).

This replaces the closed-over-variable with a "self.the_introducer", which
seems to fix the race.

One additional place to look at in the future: the client
announcement-receive path (remote_announce) uses an eventually(). If the
message has been received and the eventual-send posted (but not yet executed)
when the poller sees it, the poller might erroneously conclude that the
client is idle and cause the same problem as above. To fix this, the poller
(probably all pollers) could be enhanced to do a flushEventualQueue before
querying the are-we-done-yet predicate function.

src/allmydata/test/test_introducer.py

index b8dd50b364049c1f75dbd97f200bfda133871631..841771efa3b9d901531ef08d41f94a00f5d79a94 100644 (file)
@@ -387,6 +387,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
         received_announcements = {}
         subscribing_clients = []
         publishing_clients = []
+        self.the_introducer = introducer
         privkeys = {}
         expected_announcements = [0 for c in range(NUM_CLIENTS)]
 
@@ -483,7 +484,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
                 for c in subscribing_clients + publishing_clients:
                     if c._debug_outstanding:
                         return False
-                if introducer._debug_outstanding:
+                if self.the_introducer._debug_outstanding:
                     return False
                 return True
             return self.poll(_idle)
@@ -495,7 +496,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
 
         def _check1(res):
             log.msg("doing _check1")
-            dc = introducer._debug_counts
+            dc = self.the_introducer._debug_counts
             if server_version == V1:
                 # each storage server publishes a record, and (after its
                 # 'subscribe' has been ACKed) also publishes a "stub_client".
@@ -594,11 +595,11 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
                 c = subscribing_clients[i]
                 for k in c._debug_counts:
                     c._debug_counts[k] = 0
-            for k in introducer._debug_counts:
-                introducer._debug_counts[k] = 0
+            for k in self.the_introducer._debug_counts:
+                self.the_introducer._debug_counts[k] = 0
             expected_announcements[i] += 1 # new 'storage' for everyone
             self.create_tub(self.central_portnum)
-            newfurl = self.central_tub.registerReference(introducer,
+            newfurl = self.central_tub.registerReference(self.the_introducer,
                                                          furlFile=iff)
             assert newfurl == self.introducer_furl
         d.addCallback(_restart_introducer_tub)
@@ -614,7 +615,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
             log.msg("doing _check2")
             # assert that the introducer sent out new messages, one per
             # subscriber
-            dc = introducer._debug_counts
+            dc = self.the_introducer._debug_counts
             self.failUnlessEqual(dc["outbound_announcements"],
                                  NUM_STORAGE*NUM_CLIENTS)
             self.failUnless(dc["outbound_message"] > 0)
@@ -652,7 +653,8 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
                 introducer = old.IntroducerService_v1()
             else:
                 introducer = IntroducerService()
-            newfurl = self.central_tub.registerReference(introducer,
+            self.the_introducer = introducer
+            newfurl = self.central_tub.registerReference(self.the_introducer,
                                                          furlFile=iff)
             assert newfurl == self.introducer_furl
         d.addCallback(_restart_introducer)
@@ -663,7 +665,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
 
         def _check3(res):
             log.msg("doing _check3")
-            dc = introducer._debug_counts
+            dc = self.the_introducer._debug_counts
             self.failUnlessEqual(dc["outbound_announcements"],
                                  NUM_STORAGE*NUM_CLIENTS)
             self.failUnless(dc["outbound_message"] > 0)