]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
break introducer up into separate modules in the new allmydata.introducer package
authorBrian Warner <warner@allmydata.com>
Wed, 18 Jun 2008 19:24:16 +0000 (12:24 -0700)
committerBrian Warner <warner@allmydata.com>
Wed, 18 Jun 2008 19:24:16 +0000 (12:24 -0700)
src/allmydata/client.py
src/allmydata/introducer.py [deleted file]
src/allmydata/introducer/__init__.py [new file with mode: 0644]
src/allmydata/introducer/client.py [new file with mode: 0644]
src/allmydata/introducer/common.py [new file with mode: 0644]
src/allmydata/introducer/server.py [new file with mode: 0644]
src/allmydata/test/test_client.py
src/allmydata/test/test_introducer.py
src/allmydata/test/test_system.py

index 82c9773576230419bd577e091f70f46f4764ab47..959e55b0b82b9299271a0d231f214a70971121dd 100644 (file)
@@ -17,7 +17,7 @@ from allmydata.download import Downloader
 from allmydata.checker import Checker
 from allmydata.offloaded import Helper
 from allmydata.control import ControlServer
-from allmydata.introducer import IntroducerClient
+from allmydata.introducer.client import IntroducerClient
 from allmydata.util import hashutil, base32, testutil
 from allmydata.filenode import FileNode
 from allmydata.dirnode import NewDirectoryNode
diff --git a/src/allmydata/introducer.py b/src/allmydata/introducer.py
deleted file mode 100644 (file)
index ddbc3b1..0000000
+++ /dev/null
@@ -1,378 +0,0 @@
-
-import re, time, sha, os.path
-from base64 import b32decode
-from zope.interface import implements
-from twisted.application import service
-from foolscap import Referenceable
-from allmydata import node
-from allmydata.interfaces import RIIntroducerPublisherAndSubscriberService, \
-     RIIntroducerSubscriberClient, IIntroducerClient
-from allmydata.util import log, idlib
-
-class IntroducerNode(node.Node):
-    PORTNUMFILE = "introducer.port"
-    NODETYPE = "introducer"
-
-    def __init__(self, basedir="."):
-        node.Node.__init__(self, basedir)
-        self.init_introducer()
-        webport = self.get_config("webport")
-        if webport:
-            self.init_web(webport) # strports string
-
-    def init_introducer(self):
-        introducerservice = IntroducerService(self.basedir)
-        self.add_service(introducerservice)
-
-        d = self.when_tub_ready()
-        def _publish(res):
-            self.introducer_url = self.tub.registerReference(introducerservice,
-                                                             "introducer")
-            self.log(" introducer is at %s" % self.introducer_url)
-            self.write_config("introducer.furl", self.introducer_url + "\n")
-        d.addCallback(_publish)
-        d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
-
-    def init_web(self, webport):
-        self.log("init_web(webport=%s)", args=(webport,))
-
-        from allmydata.webish import IntroducerWebishServer
-        nodeurl_path = os.path.join(self.basedir, "node.url")
-        ws = IntroducerWebishServer(webport, nodeurl_path)
-        self.add_service(ws)
-
-def make_index(announcement):
-    (furl, service_name, ri_name, nickname, ver, oldest) = announcement
-    m = re.match(r'pb://(\w+)@', furl)
-    assert m
-    nodeid = b32decode(m.group(1).upper())
-    return (nodeid, service_name)
-
-class IntroducerService(service.MultiService, Referenceable):
-    implements(RIIntroducerPublisherAndSubscriberService)
-    name = "introducer"
-
-    def __init__(self, basedir="."):
-        service.MultiService.__init__(self)
-        self.introducer_url = None
-        # 'index' is (tubid, service_name)
-        self._announcements = {} # dict of index -> (announcement, timestamp)
-        self._subscribers = {} # dict of (rref->timestamp) dicts
-
-    def log(self, *args, **kwargs):
-        if "facility" not in kwargs:
-            kwargs["facility"] = "tahoe.introducer"
-        return log.msg(*args, **kwargs)
-
-    def get_announcements(self):
-        return self._announcements
-    def get_subscribers(self):
-        return self._subscribers
-
-    def remote_publish(self, announcement):
-        self.log("introducer: announcement published: %s" % (announcement,) )
-        index = make_index(announcement)
-        if index in self._announcements:
-            (old_announcement, timestamp) = self._announcements[index]
-            if old_announcement == announcement:
-                self.log("but we already knew it, ignoring", level=log.NOISY)
-                return
-            else:
-                self.log("old announcement being updated", level=log.NOISY)
-        self._announcements[index] = (announcement, time.time())
-        (furl, service_name, ri_name, nickname, ver, oldest) = announcement
-        for s in self._subscribers.get(service_name, []):
-            s.callRemote("announce", set([announcement]))
-
-    def remote_subscribe(self, subscriber, service_name):
-        self.log("introducer: subscription[%s] request at %s" % (service_name,
-                                                                 subscriber))
-        if service_name not in self._subscribers:
-            self._subscribers[service_name] = {}
-        subscribers = self._subscribers[service_name]
-        if subscriber in subscribers:
-            self.log("but they're already subscribed, ignoring",
-                     level=log.UNUSUAL)
-            return
-        subscribers[subscriber] = time.time()
-        def _remove():
-            self.log("introducer: unsubscribing[%s] %s" % (service_name,
-                                                           subscriber))
-            subscribers.pop(subscriber, None)
-        subscriber.notifyOnDisconnect(_remove)
-
-        announcements = set( [ ann
-                               for idx,(ann,when) in self._announcements.items()
-                               if idx[1] == service_name] )
-        d = subscriber.callRemote("announce", announcements)
-        d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)
-
-
-
-class RemoteServiceConnector:
-    """I hold information about a peer service that we want to connect to. If
-    we are connected, I hold the RemoteReference, the peer's address, and the
-    peer's version information. I remember information about when we were
-    last connected to the peer too, even if we aren't currently connected.
-
-    @ivar announcement_time: when we first heard about this service
-    @ivar last_connect_time: when we last established a connection
-    @ivar last_loss_time: when we last lost a connection
-
-    @ivar version: the peer's version, from the most recent announcement
-    @ivar oldest_supported: the peer's oldest supported version, same
-    @ivar nickname: the peer's self-reported nickname, same
-
-    @ivar rref: the RemoteReference, if connected, otherwise None
-    @ivar remote_host: the IAddress, if connected, otherwise None
-    """
-
-    def __init__(self, announcement, tub, ic):
-        self._tub = tub
-        self._announcement = announcement
-        self._ic = ic
-        (furl, service_name, ri_name, nickname, ver, oldest) = announcement
-
-        self._furl = furl
-        m = re.match(r'pb://(\w+)@', furl)
-        assert m
-        self._nodeid = b32decode(m.group(1).upper())
-        self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
-
-        self.service_name = service_name
-
-        self.log("attempting to connect to %s" % self._nodeid_s)
-        self.announcement_time = time.time()
-        self.last_loss_time = None
-        self.rref = None
-        self.remote_host = None
-        self.last_connect_time = None
-        self.version = ver
-        self.oldest_supported = oldest
-        self.nickname = nickname
-
-    def log(self, *args, **kwargs):
-        return self._ic.log(*args, **kwargs)
-
-    def startConnecting(self):
-        self._reconnector = self._tub.connectTo(self._furl, self._got_service)
-
-    def stopConnecting(self):
-        self._reconnector.stopConnecting()
-
-    def _got_service(self, rref):
-        self.last_connect_time = time.time()
-        self.remote_host = rref.tracker.broker.transport.getPeer()
-
-        self.rref = rref
-        self.log("connected to %s" % self._nodeid_s)
-
-        self._ic.add_connection(self._nodeid, self.service_name, rref)
-
-        rref.notifyOnDisconnect(self._lost, rref)
-
-    def _lost(self, rref):
-        self.log("lost connection to %s" % self._nodeid_s)
-        self.last_loss_time = time.time()
-        self.rref = None
-        self.remote_host = None
-        self._ic.remove_connection(self._nodeid, self.service_name, rref)
-
-    def reset(self):
-        self._reconnector.reset()
-
-
-class IntroducerClient(service.Service, Referenceable):
-    implements(RIIntroducerSubscriberClient, IIntroducerClient)
-
-    def __init__(self, tub, introducer_furl,
-                 nickname, my_version, oldest_supported):
-        self._tub = tub
-        self.introducer_furl = introducer_furl
-
-        self._nickname = nickname
-        self._my_version = my_version
-        self._oldest_supported = oldest_supported
-
-        self._published_announcements = set()
-
-        self._publisher = None
-        self._connected = False
-
-        self._subscribed_service_names = set()
-        self._subscriptions = set() # requests we've actually sent
-        self._received_announcements = set()
-        # TODO: this set will grow without bound, until the node is restarted
-
-        # we only accept one announcement per (peerid+service_name) pair.
-        # This insures that an upgraded host replace their previous
-        # announcement. It also means that each peer must have their own Tub
-        # (no sharing), which is slightly weird but consistent with the rest
-        # of the Tahoe codebase.
-        self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
-        # self._connections is a set of (peerid, service_name, rref) tuples
-        self._connections = set()
-
-        self.counter = 0 # incremented each time we change state, for tests
-        self.encoding_parameters = None
-
-    def startService(self):
-        service.Service.startService(self)
-        rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
-        self._introducer_reconnector = rc
-        def connect_failed(failure):
-            self.log("Initial Introducer connection failed: perhaps it's down",
-                     level=log.WEIRD, failure=failure)
-        d = self._tub.getReference(self.introducer_furl)
-        d.addErrback(connect_failed)
-
-    def _got_introducer(self, publisher):
-        self.log("connected to introducer")
-        self._connected = True
-        self._publisher = publisher
-        publisher.notifyOnDisconnect(self._disconnected)
-        self._maybe_publish()
-        self._maybe_subscribe()
-
-    def _disconnected(self):
-        self.log("bummer, we've lost our connection to the introducer")
-        self._connected = False
-        self._publisher = None
-        self._subscriptions.clear()
-
-    def stopService(self):
-        service.Service.stopService(self)
-        self._introducer_reconnector.stopConnecting()
-        for rsc in self._connectors.itervalues():
-            rsc.stopConnecting()
-
-    def log(self, *args, **kwargs):
-        if "facility" not in kwargs:
-            kwargs["facility"] = "tahoe.introducer"
-        return log.msg(*args, **kwargs)
-
-
-    def publish(self, furl, service_name, remoteinterface_name):
-        ann = (furl, service_name, remoteinterface_name,
-               self._nickname, self._my_version, self._oldest_supported)
-        self._published_announcements.add(ann)
-        self._maybe_publish()
-
-    def subscribe_to(self, service_name):
-        self._subscribed_service_names.add(service_name)
-        self._maybe_subscribe()
-
-    def _maybe_subscribe(self):
-        if not self._publisher:
-            self.log("want to subscribe, but no introducer yet",
-                     level=log.NOISY)
-            return
-        for service_name in self._subscribed_service_names:
-            if service_name not in self._subscriptions:
-                # there is a race here, but the subscription desk ignores
-                # duplicate requests.
-                self._subscriptions.add(service_name)
-                d = self._publisher.callRemote("subscribe", self, service_name)
-                d.addErrback(log.err, facility="tahoe.introducer",
-                             level=log.WEIRD)
-
-    def _maybe_publish(self):
-        if not self._publisher:
-            self.log("want to publish, but no introducer yet", level=log.NOISY)
-            return
-        # this re-publishes everything. The Introducer ignores duplicates
-        for ann in self._published_announcements:
-            d = self._publisher.callRemote("publish", ann)
-            d.addErrback(log.err, facility="tahoe.introducer",
-                         level=log.WEIRD)
-
-
-
-    def remote_announce(self, announcements):
-        for ann in announcements:
-            self.log("received %d announcements" % len(announcements))
-            (furl, service_name, ri_name, nickname, ver, oldest) = ann
-            if service_name not in self._subscribed_service_names:
-                self.log("announcement for a service we don't care about [%s]"
-                         % (service_name,), level=log.WEIRD)
-                continue
-            if ann in self._received_announcements:
-                self.log("ignoring old announcement: %s" % (ann,),
-                         level=log.NOISY)
-                continue
-            self.log("new announcement[%s]: %s" % (service_name, ann))
-            self._received_announcements.add(ann)
-            self._new_announcement(ann)
-
-    def _new_announcement(self, announcement):
-        # this will only be called for new announcements
-        index = make_index(announcement)
-        if index in self._connectors:
-            self.log("replacing earlier announcement", level=log.NOISY)
-            self._connectors[index].stopConnecting()
-        rsc = RemoteServiceConnector(announcement, self._tub, self)
-        self._connectors[index] = rsc
-        rsc.startConnecting()
-
-    def add_connection(self, nodeid, service_name, rref):
-        self._connections.add( (nodeid, service_name, rref) )
-        self.counter += 1
-        # when one connection is established, reset the timers on all others,
-        # to trigger a reconnection attempt in one second. This is intended
-        # to accelerate server connections when we've been offline for a
-        # while. The goal is to avoid hanging out for a long time with
-        # connections to only a subset of the servers, which would increase
-        # the chances that we'll put shares in weird places (and not update
-        # existing shares of mutable files). See #374 for more details.
-        for rsc in self._connectors.values():
-            rsc.reset()
-
-    def remove_connection(self, nodeid, service_name, rref):
-        self._connections.discard( (nodeid, service_name, rref) )
-        self.counter += 1
-
-
-    def get_all_connections(self):
-        return frozenset(self._connections)
-
-    def get_all_connectors(self):
-        return self._connectors.copy()
-
-    def get_all_peerids(self):
-        return frozenset([peerid
-                          for (peerid, service_name, rref)
-                          in self._connections])
-
-    def get_all_connections_for(self, service_name):
-        return frozenset([c
-                          for c in self._connections
-                          if c[1] == service_name])
-
-    def get_permuted_peers(self, service_name, key):
-        """Return an ordered list of (peerid, rref) tuples."""
-
-        results = []
-        for (c_peerid, c_service_name, rref) in self._connections:
-            assert isinstance(c_peerid, str)
-            if c_service_name != service_name:
-                continue
-            permuted = sha.new(key + c_peerid).digest()
-            results.append((permuted, c_peerid, rref))
-
-        results.sort(lambda a,b: cmp(a[0], b[0]))
-        return [ (r[1], r[2]) for r in results ]
-
-
-
-    def remote_set_encoding_parameters(self, parameters):
-        self.encoding_parameters = parameters
-
-    def connected_to_introducer(self):
-        return self._connected
-
-    def debug_disconnect_from_peerid(self, victim_nodeid):
-        # for unit tests: locate and sever all connections to the given
-        # peerid.
-        for (nodeid, service_name, rref) in self._connections:
-            if nodeid == victim_nodeid:
-                rref.tracker.broker.transport.loseConnection()
diff --git a/src/allmydata/introducer/__init__.py b/src/allmydata/introducer/__init__.py
new file mode 100644 (file)
index 0000000..a9cab7e
--- /dev/null
@@ -0,0 +1,9 @@
+
+# This is for compatibilty with old .tac files, which reference
+# allmydata.introducer.IntroducerNode
+
+from server import IntroducerNode
+
+# hush pyflakes
+_unused = [IntroducerNode]
+del _unused
diff --git a/src/allmydata/introducer/client.py b/src/allmydata/introducer/client.py
new file mode 100644 (file)
index 0000000..008aff1
--- /dev/null
@@ -0,0 +1,278 @@
+
+import re, time, sha
+from base64 import b32decode
+from zope.interface import implements
+from twisted.application import service
+from foolscap import Referenceable
+from allmydata.interfaces import RIIntroducerSubscriberClient, IIntroducerClient
+from allmydata.util import log, idlib
+from allmydata.introducer.common import make_index
+
+
+class RemoteServiceConnector:
+    """I hold information about a peer service that we want to connect to. If
+    we are connected, I hold the RemoteReference, the peer's address, and the
+    peer's version information. I remember information about when we were
+    last connected to the peer too, even if we aren't currently connected.
+
+    @ivar announcement_time: when we first heard about this service
+    @ivar last_connect_time: when we last established a connection
+    @ivar last_loss_time: when we last lost a connection
+
+    @ivar version: the peer's version, from the most recent announcement
+    @ivar oldest_supported: the peer's oldest supported version, same
+    @ivar nickname: the peer's self-reported nickname, same
+
+    @ivar rref: the RemoteReference, if connected, otherwise None
+    @ivar remote_host: the IAddress, if connected, otherwise None
+    """
+
+    def __init__(self, announcement, tub, ic):
+        self._tub = tub
+        self._announcement = announcement
+        self._ic = ic
+        (furl, service_name, ri_name, nickname, ver, oldest) = announcement
+
+        self._furl = furl
+        m = re.match(r'pb://(\w+)@', furl)
+        assert m
+        self._nodeid = b32decode(m.group(1).upper())
+        self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
+
+        self.service_name = service_name
+
+        self.log("attempting to connect to %s" % self._nodeid_s)
+        self.announcement_time = time.time()
+        self.last_loss_time = None
+        self.rref = None
+        self.remote_host = None
+        self.last_connect_time = None
+        self.version = ver
+        self.oldest_supported = oldest
+        self.nickname = nickname
+
+    def log(self, *args, **kwargs):
+        return self._ic.log(*args, **kwargs)
+
+    def startConnecting(self):
+        self._reconnector = self._tub.connectTo(self._furl, self._got_service)
+
+    def stopConnecting(self):
+        self._reconnector.stopConnecting()
+
+    def _got_service(self, rref):
+        self.last_connect_time = time.time()
+        self.remote_host = rref.tracker.broker.transport.getPeer()
+
+        self.rref = rref
+        self.log("connected to %s" % self._nodeid_s)
+
+        self._ic.add_connection(self._nodeid, self.service_name, rref)
+
+        rref.notifyOnDisconnect(self._lost, rref)
+
+    def _lost(self, rref):
+        self.log("lost connection to %s" % self._nodeid_s)
+        self.last_loss_time = time.time()
+        self.rref = None
+        self.remote_host = None
+        self._ic.remove_connection(self._nodeid, self.service_name, rref)
+
+    def reset(self):
+        self._reconnector.reset()
+
+
+class IntroducerClient(service.Service, Referenceable):
+    implements(RIIntroducerSubscriberClient, IIntroducerClient)
+
+    def __init__(self, tub, introducer_furl,
+                 nickname, my_version, oldest_supported):
+        self._tub = tub
+        self.introducer_furl = introducer_furl
+
+        self._nickname = nickname
+        self._my_version = my_version
+        self._oldest_supported = oldest_supported
+
+        self._published_announcements = set()
+
+        self._publisher = None
+        self._connected = False
+
+        self._subscribed_service_names = set()
+        self._subscriptions = set() # requests we've actually sent
+        self._received_announcements = set()
+        # TODO: this set will grow without bound, until the node is restarted
+
+        # we only accept one announcement per (peerid+service_name) pair.
+        # This insures that an upgraded host replace their previous
+        # announcement. It also means that each peer must have their own Tub
+        # (no sharing), which is slightly weird but consistent with the rest
+        # of the Tahoe codebase.
+        self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
+        # self._connections is a set of (peerid, service_name, rref) tuples
+        self._connections = set()
+
+        self.counter = 0 # incremented each time we change state, for tests
+        self.encoding_parameters = None
+
+    def startService(self):
+        service.Service.startService(self)
+        rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
+        self._introducer_reconnector = rc
+        def connect_failed(failure):
+            self.log("Initial Introducer connection failed: perhaps it's down",
+                     level=log.WEIRD, failure=failure)
+        d = self._tub.getReference(self.introducer_furl)
+        d.addErrback(connect_failed)
+
+    def _got_introducer(self, publisher):
+        self.log("connected to introducer")
+        self._connected = True
+        self._publisher = publisher
+        publisher.notifyOnDisconnect(self._disconnected)
+        self._maybe_publish()
+        self._maybe_subscribe()
+
+    def _disconnected(self):
+        self.log("bummer, we've lost our connection to the introducer")
+        self._connected = False
+        self._publisher = None
+        self._subscriptions.clear()
+
+    def stopService(self):
+        service.Service.stopService(self)
+        self._introducer_reconnector.stopConnecting()
+        for rsc in self._connectors.itervalues():
+            rsc.stopConnecting()
+
+    def log(self, *args, **kwargs):
+        if "facility" not in kwargs:
+            kwargs["facility"] = "tahoe.introducer"
+        return log.msg(*args, **kwargs)
+
+
+    def publish(self, furl, service_name, remoteinterface_name):
+        ann = (furl, service_name, remoteinterface_name,
+               self._nickname, self._my_version, self._oldest_supported)
+        self._published_announcements.add(ann)
+        self._maybe_publish()
+
+    def subscribe_to(self, service_name):
+        self._subscribed_service_names.add(service_name)
+        self._maybe_subscribe()
+
+    def _maybe_subscribe(self):
+        if not self._publisher:
+            self.log("want to subscribe, but no introducer yet",
+                     level=log.NOISY)
+            return
+        for service_name in self._subscribed_service_names:
+            if service_name not in self._subscriptions:
+                # there is a race here, but the subscription desk ignores
+                # duplicate requests.
+                self._subscriptions.add(service_name)
+                d = self._publisher.callRemote("subscribe", self, service_name)
+                d.addErrback(log.err, facility="tahoe.introducer",
+                             level=log.WEIRD)
+
+    def _maybe_publish(self):
+        if not self._publisher:
+            self.log("want to publish, but no introducer yet", level=log.NOISY)
+            return
+        # this re-publishes everything. The Introducer ignores duplicates
+        for ann in self._published_announcements:
+            d = self._publisher.callRemote("publish", ann)
+            d.addErrback(log.err, facility="tahoe.introducer",
+                         level=log.WEIRD)
+
+
+
+    def remote_announce(self, announcements):
+        for ann in announcements:
+            self.log("received %d announcements" % len(announcements))
+            (furl, service_name, ri_name, nickname, ver, oldest) = ann
+            if service_name not in self._subscribed_service_names:
+                self.log("announcement for a service we don't care about [%s]"
+                         % (service_name,), level=log.WEIRD)
+                continue
+            if ann in self._received_announcements:
+                self.log("ignoring old announcement: %s" % (ann,),
+                         level=log.NOISY)
+                continue
+            self.log("new announcement[%s]: %s" % (service_name, ann))
+            self._received_announcements.add(ann)
+            self._new_announcement(ann)
+
+    def _new_announcement(self, announcement):
+        # this will only be called for new announcements
+        index = make_index(announcement)
+        if index in self._connectors:
+            self.log("replacing earlier announcement", level=log.NOISY)
+            self._connectors[index].stopConnecting()
+        rsc = RemoteServiceConnector(announcement, self._tub, self)
+        self._connectors[index] = rsc
+        rsc.startConnecting()
+
+    def add_connection(self, nodeid, service_name, rref):
+        self._connections.add( (nodeid, service_name, rref) )
+        self.counter += 1
+        # when one connection is established, reset the timers on all others,
+        # to trigger a reconnection attempt in one second. This is intended
+        # to accelerate server connections when we've been offline for a
+        # while. The goal is to avoid hanging out for a long time with
+        # connections to only a subset of the servers, which would increase
+        # the chances that we'll put shares in weird places (and not update
+        # existing shares of mutable files). See #374 for more details.
+        for rsc in self._connectors.values():
+            rsc.reset()
+
+    def remove_connection(self, nodeid, service_name, rref):
+        self._connections.discard( (nodeid, service_name, rref) )
+        self.counter += 1
+
+
+    def get_all_connections(self):
+        return frozenset(self._connections)
+
+    def get_all_connectors(self):
+        return self._connectors.copy()
+
+    def get_all_peerids(self):
+        return frozenset([peerid
+                          for (peerid, service_name, rref)
+                          in self._connections])
+
+    def get_all_connections_for(self, service_name):
+        return frozenset([c
+                          for c in self._connections
+                          if c[1] == service_name])
+
+    def get_permuted_peers(self, service_name, key):
+        """Return an ordered list of (peerid, rref) tuples."""
+
+        results = []
+        for (c_peerid, c_service_name, rref) in self._connections:
+            assert isinstance(c_peerid, str)
+            if c_service_name != service_name:
+                continue
+            permuted = sha.new(key + c_peerid).digest()
+            results.append((permuted, c_peerid, rref))
+
+        results.sort(lambda a,b: cmp(a[0], b[0]))
+        return [ (r[1], r[2]) for r in results ]
+
+
+
+    def remote_set_encoding_parameters(self, parameters):
+        self.encoding_parameters = parameters
+
+    def connected_to_introducer(self):
+        return self._connected
+
+    def debug_disconnect_from_peerid(self, victim_nodeid):
+        # for unit tests: locate and sever all connections to the given
+        # peerid.
+        for (nodeid, service_name, rref) in self._connections:
+            if nodeid == victim_nodeid:
+                rref.tracker.broker.transport.loseConnection()
diff --git a/src/allmydata/introducer/common.py b/src/allmydata/introducer/common.py
new file mode 100644 (file)
index 0000000..54f611a
--- /dev/null
@@ -0,0 +1,11 @@
+
+import re
+from base64 import b32decode
+
+def make_index(announcement):
+    (furl, service_name, ri_name, nickname, ver, oldest) = announcement
+    m = re.match(r'pb://(\w+)@', furl)
+    assert m
+    nodeid = b32decode(m.group(1).upper())
+    return (nodeid, service_name)
+
diff --git a/src/allmydata/introducer/server.py b/src/allmydata/introducer/server.py
new file mode 100644 (file)
index 0000000..35c1087
--- /dev/null
@@ -0,0 +1,103 @@
+
+import time, os.path
+from zope.interface import implements
+from twisted.application import service
+from foolscap import Referenceable
+from allmydata import node
+from allmydata.interfaces import RIIntroducerPublisherAndSubscriberService
+from allmydata.util import log
+from allmydata.introducer.common import make_index
+
+class IntroducerNode(node.Node):
+    PORTNUMFILE = "introducer.port"
+    NODETYPE = "introducer"
+
+    def __init__(self, basedir="."):
+        node.Node.__init__(self, basedir)
+        self.init_introducer()
+        webport = self.get_config("webport")
+        if webport:
+            self.init_web(webport) # strports string
+
+    def init_introducer(self):
+        introducerservice = IntroducerService(self.basedir)
+        self.add_service(introducerservice)
+
+        d = self.when_tub_ready()
+        def _publish(res):
+            self.introducer_url = self.tub.registerReference(introducerservice,
+                                                             "introducer")
+            self.log(" introducer is at %s" % self.introducer_url)
+            self.write_config("introducer.furl", self.introducer_url + "\n")
+        d.addCallback(_publish)
+        d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
+
+    def init_web(self, webport):
+        self.log("init_web(webport=%s)", args=(webport,))
+
+        from allmydata.webish import IntroducerWebishServer
+        nodeurl_path = os.path.join(self.basedir, "node.url")
+        ws = IntroducerWebishServer(webport, nodeurl_path)
+        self.add_service(ws)
+
+class IntroducerService(service.MultiService, Referenceable):
+    implements(RIIntroducerPublisherAndSubscriberService)
+    name = "introducer"
+
+    def __init__(self, basedir="."):
+        service.MultiService.__init__(self)
+        self.introducer_url = None
+        # 'index' is (tubid, service_name)
+        self._announcements = {} # dict of index -> (announcement, timestamp)
+        self._subscribers = {} # dict of (rref->timestamp) dicts
+
+    def log(self, *args, **kwargs):
+        if "facility" not in kwargs:
+            kwargs["facility"] = "tahoe.introducer"
+        return log.msg(*args, **kwargs)
+
+    def get_announcements(self):
+        return self._announcements
+    def get_subscribers(self):
+        return self._subscribers
+
+    def remote_publish(self, announcement):
+        self.log("introducer: announcement published: %s" % (announcement,) )
+        index = make_index(announcement)
+        if index in self._announcements:
+            (old_announcement, timestamp) = self._announcements[index]
+            if old_announcement == announcement:
+                self.log("but we already knew it, ignoring", level=log.NOISY)
+                return
+            else:
+                self.log("old announcement being updated", level=log.NOISY)
+        self._announcements[index] = (announcement, time.time())
+        (furl, service_name, ri_name, nickname, ver, oldest) = announcement
+        for s in self._subscribers.get(service_name, []):
+            s.callRemote("announce", set([announcement]))
+
+    def remote_subscribe(self, subscriber, service_name):
+        self.log("introducer: subscription[%s] request at %s" % (service_name,
+                                                                 subscriber))
+        if service_name not in self._subscribers:
+            self._subscribers[service_name] = {}
+        subscribers = self._subscribers[service_name]
+        if subscriber in subscribers:
+            self.log("but they're already subscribed, ignoring",
+                     level=log.UNUSUAL)
+            return
+        subscribers[subscriber] = time.time()
+        def _remove():
+            self.log("introducer: unsubscribing[%s] %s" % (service_name,
+                                                           subscriber))
+            subscribers.pop(subscriber, None)
+        subscriber.notifyOnDisconnect(_remove)
+
+        announcements = set( [ ann
+                               for idx,(ann,when) in self._announcements.items()
+                               if idx[1] == service_name] )
+        d = subscriber.callRemote("announce", announcements)
+        d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)
+
+
+
index a8e169a029344f9de0921b8bfc75463f5addcdb7..3d0a9085ab3839efa171de6a6fa19b8435230f1c 100644 (file)
@@ -5,11 +5,12 @@ from twisted.application import service
 from twisted.python import log
 
 import allmydata
-from allmydata import client, introducer
+from allmydata import client
+from allmydata.introducer.client import IntroducerClient
 from allmydata.util import base32, testutil
 from foolscap.eventual import flushEventualQueue
 
-class FakeIntroducerClient(introducer.IntroducerClient):
+class FakeIntroducerClient(IntroducerClient):
     def __init__(self):
         self._connections = set()
     def add_peer(self, nodeid):
index 53afbda7a349a921160556cb5e3306c638e616da..e0903bf3e40cbd69343bea6bfddb079dacf93910 100644 (file)
@@ -9,7 +9,10 @@ from twisted.python import log
 from foolscap import Tub, Referenceable
 from foolscap.eventual import fireEventually, flushEventualQueue
 from twisted.application import service
-from allmydata.introducer import IntroducerClient, IntroducerService, IntroducerNode
+from allmydata.introducer.client import IntroducerClient
+from allmydata.introducer.server import IntroducerService
+# test compatibility with old introducer .tac files
+from allmydata.introducer import IntroducerNode
 from allmydata.util import testutil, idlib
 
 class FakeNode(Referenceable):
index 761fbb276c5e5fdfc4c272723c68b3c1011b214b..e3fac3e7f6f96bb05cca81b408428f5006dac786 100644 (file)
@@ -9,7 +9,7 @@ from twisted.internet.error import ConnectionDone, ConnectionLost
 from twisted.application import service
 import allmydata
 from allmydata import client, uri, download, upload, storage, offloaded
-from allmydata.introducer import IntroducerNode
+from allmydata.introducer.server import IntroducerNode
 from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil
 from allmydata.util import log
 from allmydata.scripts import runner, cli