From: Brian Warner <warner@allmydata.com>
Date: Wed, 23 Apr 2008 22:05:39 +0000 (-0700)
Subject: introducer: only record one announcement per (tubid,service) tuple. Fixes #343.
X-Git-Tag: allmydata-tahoe-1.1.0~186
X-Git-Url: https://git.rkrishnan.org/components/%22news.html/provisioning?a=commitdiff_plain;h=186492e620f6bb628da4a97ab7667161ec0520d4;p=tahoe-lafs%2Ftahoe-lafs.git

introducer: only record one announcement per (tubid,service) tuple. Fixes #343.
---

diff --git a/src/allmydata/introducer.py b/src/allmydata/introducer.py
index 6b55a406..ddbc3b14 100644
--- a/src/allmydata/introducer.py
+++ b/src/allmydata/introducer.py
@@ -41,6 +41,13 @@ class IntroducerNode(node.Node):
         ws = IntroducerWebishServer(webport, nodeurl_path)
         self.add_service(ws)
 
+def make_index(announcement):
+    (furl, service_name, ri_name, nickname, ver, oldest) = announcement
+    m = re.match(r'pb://(\w+)@', furl)
+    assert m
+    nodeid = b32decode(m.group(1).upper())
+    return (nodeid, service_name)
+
 class IntroducerService(service.MultiService, Referenceable):
     implements(RIIntroducerPublisherAndSubscriberService)
     name = "introducer"
@@ -48,7 +55,8 @@ class IntroducerService(service.MultiService, Referenceable):
     def __init__(self, basedir="."):
         service.MultiService.__init__(self)
         self.introducer_url = None
-        self._announcements = {} # dict of (announcement)->timestamp
+        # 'index' is (tubid, service_name)
+        self._announcements = {} # dict of index -> (announcement, timestamp)
         self._subscribers = {} # dict of (rref->timestamp) dicts
 
     def log(self, *args, **kwargs):
@@ -63,11 +71,16 @@ class IntroducerService(service.MultiService, Referenceable):
 
     def remote_publish(self, announcement):
         self.log("introducer: announcement published: %s" % (announcement,) )
+        index = make_index(announcement)
+        if index in self._announcements:
+            (old_announcement, timestamp) = self._announcements[index]
+            if old_announcement == announcement:
+                self.log("but we already knew it, ignoring", level=log.NOISY)
+                return
+            else:
+                self.log("old announcement being updated", level=log.NOISY)
+        self._announcements[index] = (announcement, time.time())
         (furl, service_name, ri_name, nickname, ver, oldest) = announcement
-        if announcement in self._announcements:
-            self.log("but we already knew it, ignoring", level=log.NOISY)
-            return
-        self._announcements[announcement] = time.time()
         for s in self._subscribers.get(service_name, []):
             s.callRemote("announce", set([announcement]))
 
@@ -88,9 +101,9 @@ class IntroducerService(service.MultiService, Referenceable):
             subscribers.pop(subscriber, None)
         subscriber.notifyOnDisconnect(_remove)
 
-        announcements = set( [ a
-                               for a in self._announcements
-                               if a[1] == service_name ] )
+        announcements = set( [ ann
+                               for idx,(ann,when) in self._announcements.items()
+                               if idx[1] == service_name] )
         d = subscriber.callRemote("announce", announcements)
         d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)
 
@@ -126,7 +139,6 @@ class RemoteServiceConnector:
         self._nodeid = b32decode(m.group(1).upper())
         self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
 
-        self._index = (self._nodeid, service_name)
         self.service_name = service_name
 
         self.log("attempting to connect to %s" % self._nodeid_s)
@@ -142,9 +154,6 @@ class RemoteServiceConnector:
     def log(self, *args, **kwargs):
         return self._ic.log(*args, **kwargs)
 
-    def get_index(self):
-        return self._index
-
     def startConnecting(self):
         self._reconnector = self._tub.connectTo(self._furl, self._got_service)
 
@@ -297,10 +306,11 @@ class IntroducerClient(service.Service, Referenceable):
 
     def _new_announcement(self, announcement):
         # this will only be called for new announcements
-        rsc = RemoteServiceConnector(announcement, self._tub, self)
-        index = rsc.get_index()
+        index = make_index(announcement)
         if index in self._connectors:
+            self.log("replacing earlier announcement", level=log.NOISY)
             self._connectors[index].stopConnecting()
+        rsc = RemoteServiceConnector(announcement, self._tub, self)
         self._connectors[index] = rsc
         rsc.startConnecting()
 
diff --git a/src/allmydata/test/test_introducer.py b/src/allmydata/test/test_introducer.py
index 4d115518..53afbda7 100644
--- a/src/allmydata/test/test_introducer.py
+++ b/src/allmydata/test/test_introducer.py
@@ -51,6 +51,26 @@ class TestIntroducer(unittest.TestCase, testutil.PollMixin):
         i = IntroducerService()
         i.setServiceParent(self.parent)
 
+    def test_duplicate(self):
+        i = IntroducerService()
+        self.failUnlessEqual(len(i.get_announcements()), 0)
+        self.failUnlessEqual(len(i.get_subscribers()), 0)
+        furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
+        furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
+        ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
+        ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
+        ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
+        i.remote_publish(ann1)
+        self.failUnlessEqual(len(i.get_announcements()), 1)
+        self.failUnlessEqual(len(i.get_subscribers()), 0)
+        i.remote_publish(ann2)
+        self.failUnlessEqual(len(i.get_announcements()), 2)
+        self.failUnlessEqual(len(i.get_subscribers()), 0)
+        i.remote_publish(ann1b)
+        self.failUnlessEqual(len(i.get_announcements()), 2)
+        self.failUnlessEqual(len(i.get_subscribers()), 0)
+
+
     def test_system(self):
 
         self.central_tub = tub = Tub()
diff --git a/src/allmydata/web/introweb.py b/src/allmydata/web/introweb.py
index 592ef6bb..5fab0e91 100644
--- a/src/allmydata/web/introweb.py
+++ b/src/allmydata/web/introweb.py
@@ -29,7 +29,7 @@ class IntroducerRoot(rend.Page):
         res["subscription_summary"] = subscription_summary
 
         announcement_summary = {}
-        for ann in i.get_announcements():
+        for (ann,when) in i.get_announcements().values():
             (furl, service_name, ri_name, nickname, ver, oldest) = ann
             if service_name not in announcement_summary:
                 announcement_summary[service_name] = 0
@@ -48,7 +48,7 @@ class IntroducerRoot(rend.Page):
     def render_announcement_summary(self, ctx, data):
         i = IClient(ctx).getServiceNamed("introducer")
         services = {}
-        for ann in i.get_announcements():
+        for (ann,when) in i.get_announcements().values():
             (furl, service_name, ri_name, nickname, ver, oldest) = ann
             if service_name not in services:
                 services[service_name] = 0
@@ -69,7 +69,7 @@ class IntroducerRoot(rend.Page):
     def data_services(self, ctx, data):
         i = IClient(ctx).getServiceNamed("introducer")
         ann = [(since,a)
-               for (a,since) in i.get_announcements().items()
+               for (a,since) in i.get_announcements().values()
                if a[1] != "stub_client"]
         ann.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) )
         return ann
@@ -94,7 +94,7 @@ class IntroducerRoot(rend.Page):
         i = IClient(ctx).getServiceNamed("introducer")
         # use the "stub_client" announcements to get information per nodeid
         clients = {}
-        for ann in i.get_announcements():
+        for (ann,when) in i.get_announcements().values():
             if ann[1] != "stub_client":
                 continue
             (furl, service_name, ri_name, nickname, ver, oldest) = ann