From: Brian Warner Date: Wed, 23 Apr 2008 22:05:39 +0000 (-0700) Subject: introducer: only record one announcement per (tubid,service) tuple. Fixes #343. X-Git-Tag: allmydata-tahoe-1.1.0~186 X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/uri//%22%22?a=commitdiff_plain;h=186492e620f6bb628da4a97ab7667161ec0520d4;p=tahoe-lafs%2Ftahoe-lafs.git introducer: only record one announcement per (tubid,service) tuple. Fixes #343. --- diff --git a/src/allmydata/introducer.py b/src/allmydata/introducer.py index 6b55a406..ddbc3b14 100644 --- a/src/allmydata/introducer.py +++ b/src/allmydata/introducer.py @@ -41,6 +41,13 @@ class IntroducerNode(node.Node): ws = IntroducerWebishServer(webport, nodeurl_path) self.add_service(ws) +def make_index(announcement): + (furl, service_name, ri_name, nickname, ver, oldest) = announcement + m = re.match(r'pb://(\w+)@', furl) + assert m + nodeid = b32decode(m.group(1).upper()) + return (nodeid, service_name) + class IntroducerService(service.MultiService, Referenceable): implements(RIIntroducerPublisherAndSubscriberService) name = "introducer" @@ -48,7 +55,8 @@ class IntroducerService(service.MultiService, Referenceable): def __init__(self, basedir="."): service.MultiService.__init__(self) self.introducer_url = None - self._announcements = {} # dict of (announcement)->timestamp + # 'index' is (tubid, service_name) + self._announcements = {} # dict of index -> (announcement, timestamp) self._subscribers = {} # dict of (rref->timestamp) dicts def log(self, *args, **kwargs): @@ -63,11 +71,16 @@ class IntroducerService(service.MultiService, Referenceable): def remote_publish(self, announcement): self.log("introducer: announcement published: %s" % (announcement,) ) + index = make_index(announcement) + if index in self._announcements: + (old_announcement, timestamp) = self._announcements[index] + if old_announcement == announcement: + self.log("but we already knew it, ignoring", level=log.NOISY) + return + else: + self.log("old announcement being updated", level=log.NOISY) + self._announcements[index] = (announcement, time.time()) (furl, service_name, ri_name, nickname, ver, oldest) = announcement - if announcement in self._announcements: - self.log("but we already knew it, ignoring", level=log.NOISY) - return - self._announcements[announcement] = time.time() for s in self._subscribers.get(service_name, []): s.callRemote("announce", set([announcement])) @@ -88,9 +101,9 @@ class IntroducerService(service.MultiService, Referenceable): subscribers.pop(subscriber, None) subscriber.notifyOnDisconnect(_remove) - announcements = set( [ a - for a in self._announcements - if a[1] == service_name ] ) + announcements = set( [ ann + for idx,(ann,when) in self._announcements.items() + if idx[1] == service_name] ) d = subscriber.callRemote("announce", announcements) d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL) @@ -126,7 +139,6 @@ class RemoteServiceConnector: self._nodeid = b32decode(m.group(1).upper()) self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid) - self._index = (self._nodeid, service_name) self.service_name = service_name self.log("attempting to connect to %s" % self._nodeid_s) @@ -142,9 +154,6 @@ class RemoteServiceConnector: def log(self, *args, **kwargs): return self._ic.log(*args, **kwargs) - def get_index(self): - return self._index - def startConnecting(self): self._reconnector = self._tub.connectTo(self._furl, self._got_service) @@ -297,10 +306,11 @@ class IntroducerClient(service.Service, Referenceable): def _new_announcement(self, announcement): # this will only be called for new announcements - rsc = RemoteServiceConnector(announcement, self._tub, self) - index = rsc.get_index() + index = make_index(announcement) if index in self._connectors: + self.log("replacing earlier announcement", level=log.NOISY) self._connectors[index].stopConnecting() + rsc = RemoteServiceConnector(announcement, self._tub, self) self._connectors[index] = rsc rsc.startConnecting() diff --git a/src/allmydata/test/test_introducer.py b/src/allmydata/test/test_introducer.py index 4d115518..53afbda7 100644 --- a/src/allmydata/test/test_introducer.py +++ b/src/allmydata/test/test_introducer.py @@ -51,6 +51,26 @@ class TestIntroducer(unittest.TestCase, testutil.PollMixin): i = IntroducerService() i.setServiceParent(self.parent) + def test_duplicate(self): + i = IntroducerService() + self.failUnlessEqual(len(i.get_announcements()), 0) + self.failUnlessEqual(len(i.get_subscribers()), 0) + furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra" + furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra" + ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0") + ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0") + ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0") + i.remote_publish(ann1) + self.failUnlessEqual(len(i.get_announcements()), 1) + self.failUnlessEqual(len(i.get_subscribers()), 0) + i.remote_publish(ann2) + self.failUnlessEqual(len(i.get_announcements()), 2) + self.failUnlessEqual(len(i.get_subscribers()), 0) + i.remote_publish(ann1b) + self.failUnlessEqual(len(i.get_announcements()), 2) + self.failUnlessEqual(len(i.get_subscribers()), 0) + + def test_system(self): self.central_tub = tub = Tub() diff --git a/src/allmydata/web/introweb.py b/src/allmydata/web/introweb.py index 592ef6bb..5fab0e91 100644 --- a/src/allmydata/web/introweb.py +++ b/src/allmydata/web/introweb.py @@ -29,7 +29,7 @@ class IntroducerRoot(rend.Page): res["subscription_summary"] = subscription_summary announcement_summary = {} - for ann in i.get_announcements(): + for (ann,when) in i.get_announcements().values(): (furl, service_name, ri_name, nickname, ver, oldest) = ann if service_name not in announcement_summary: announcement_summary[service_name] = 0 @@ -48,7 +48,7 @@ class IntroducerRoot(rend.Page): def render_announcement_summary(self, ctx, data): i = IClient(ctx).getServiceNamed("introducer") services = {} - for ann in i.get_announcements(): + for (ann,when) in i.get_announcements().values(): (furl, service_name, ri_name, nickname, ver, oldest) = ann if service_name not in services: services[service_name] = 0 @@ -69,7 +69,7 @@ class IntroducerRoot(rend.Page): def data_services(self, ctx, data): i = IClient(ctx).getServiceNamed("introducer") ann = [(since,a) - for (a,since) in i.get_announcements().items() + for (a,since) in i.get_announcements().values() if a[1] != "stub_client"] ann.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) ) return ann @@ -94,7 +94,7 @@ class IntroducerRoot(rend.Page): i = IClient(ctx).getServiceNamed("introducer") # use the "stub_client" announcements to get information per nodeid clients = {} - for ann in i.get_announcements(): + for (ann,when) in i.get_announcements().values(): if ann[1] != "stub_client": continue (furl, service_name, ri_name, nickname, ver, oldest) = ann