ws = IntroducerWebishServer(webport, nodeurl_path)
self.add_service(ws)
+def make_index(announcement):
+ (furl, service_name, ri_name, nickname, ver, oldest) = announcement
+ m = re.match(r'pb://(\w+)@', furl)
+ assert m
+ nodeid = b32decode(m.group(1).upper())
+ return (nodeid, service_name)
+
class IntroducerService(service.MultiService, Referenceable):
implements(RIIntroducerPublisherAndSubscriberService)
name = "introducer"
def __init__(self, basedir="."):
service.MultiService.__init__(self)
self.introducer_url = None
- self._announcements = {} # dict of (announcement)->timestamp
+ # 'index' is (tubid, service_name)
+ self._announcements = {} # dict of index -> (announcement, timestamp)
self._subscribers = {} # dict of (rref->timestamp) dicts
def log(self, *args, **kwargs):
def remote_publish(self, announcement):
self.log("introducer: announcement published: %s" % (announcement,) )
+ index = make_index(announcement)
+ if index in self._announcements:
+ (old_announcement, timestamp) = self._announcements[index]
+ if old_announcement == announcement:
+ self.log("but we already knew it, ignoring", level=log.NOISY)
+ return
+ else:
+ self.log("old announcement being updated", level=log.NOISY)
+ self._announcements[index] = (announcement, time.time())
(furl, service_name, ri_name, nickname, ver, oldest) = announcement
- if announcement in self._announcements:
- self.log("but we already knew it, ignoring", level=log.NOISY)
- return
- self._announcements[announcement] = time.time()
for s in self._subscribers.get(service_name, []):
s.callRemote("announce", set([announcement]))
subscribers.pop(subscriber, None)
subscriber.notifyOnDisconnect(_remove)
- announcements = set( [ a
- for a in self._announcements
- if a[1] == service_name ] )
+ announcements = set( [ ann
+ for idx,(ann,when) in self._announcements.items()
+ if idx[1] == service_name] )
d = subscriber.callRemote("announce", announcements)
d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)
self._nodeid = b32decode(m.group(1).upper())
self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
- self._index = (self._nodeid, service_name)
self.service_name = service_name
self.log("attempting to connect to %s" % self._nodeid_s)
def log(self, *args, **kwargs):
return self._ic.log(*args, **kwargs)
- def get_index(self):
- return self._index
-
def startConnecting(self):
self._reconnector = self._tub.connectTo(self._furl, self._got_service)
def _new_announcement(self, announcement):
# this will only be called for new announcements
- rsc = RemoteServiceConnector(announcement, self._tub, self)
- index = rsc.get_index()
+ index = make_index(announcement)
if index in self._connectors:
+ self.log("replacing earlier announcement", level=log.NOISY)
self._connectors[index].stopConnecting()
+ rsc = RemoteServiceConnector(announcement, self._tub, self)
self._connectors[index] = rsc
rsc.startConnecting()
i = IntroducerService()
i.setServiceParent(self.parent)
+ def test_duplicate(self):
+ i = IntroducerService()
+ self.failUnlessEqual(len(i.get_announcements()), 0)
+ self.failUnlessEqual(len(i.get_subscribers()), 0)
+ furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36106,127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
+ furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@192.168.69.247:36111,127.0.0.1:36106/ttwwoogj2ja2qr2srq4ikjwnl7xfgbra"
+ ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
+ ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
+ ann2 = (furl2, "storage", "RIStorage", "nick2", "ver30", "ver0")
+ i.remote_publish(ann1)
+ self.failUnlessEqual(len(i.get_announcements()), 1)
+ self.failUnlessEqual(len(i.get_subscribers()), 0)
+ i.remote_publish(ann2)
+ self.failUnlessEqual(len(i.get_announcements()), 2)
+ self.failUnlessEqual(len(i.get_subscribers()), 0)
+ i.remote_publish(ann1b)
+ self.failUnlessEqual(len(i.get_announcements()), 2)
+ self.failUnlessEqual(len(i.get_subscribers()), 0)
+
+
def test_system(self):
self.central_tub = tub = Tub()
res["subscription_summary"] = subscription_summary
announcement_summary = {}
- for ann in i.get_announcements():
+ for (ann,when) in i.get_announcements().values():
(furl, service_name, ri_name, nickname, ver, oldest) = ann
if service_name not in announcement_summary:
announcement_summary[service_name] = 0
def render_announcement_summary(self, ctx, data):
i = IClient(ctx).getServiceNamed("introducer")
services = {}
- for ann in i.get_announcements():
+ for (ann,when) in i.get_announcements().values():
(furl, service_name, ri_name, nickname, ver, oldest) = ann
if service_name not in services:
services[service_name] = 0
def data_services(self, ctx, data):
i = IClient(ctx).getServiceNamed("introducer")
ann = [(since,a)
- for (a,since) in i.get_announcements().items()
+ for (a,since) in i.get_announcements().values()
if a[1] != "stub_client"]
ann.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) )
return ann
i = IClient(ctx).getServiceNamed("introducer")
# use the "stub_client" announcements to get information per nodeid
clients = {}
- for ann in i.get_announcements():
+ for (ann,when) in i.get_announcements().values():
if ann[1] != "stub_client":
continue
(furl, service_name, ri_name, nickname, ver, oldest) = ann