From 28f4652b96d9889b6a7cae82ce42aec360498f54 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Wed, 18 Jun 2008 12:24:16 -0700 Subject: [PATCH] break introducer up into separate modules in the new allmydata.introducer package --- src/allmydata/client.py | 2 +- src/allmydata/introducer/__init__.py | 9 ++ .../{introducer.py => introducer/client.py} | 106 +----------------- src/allmydata/introducer/common.py | 11 ++ src/allmydata/introducer/server.py | 103 +++++++++++++++++ src/allmydata/test/test_client.py | 5 +- src/allmydata/test/test_introducer.py | 5 +- src/allmydata/test/test_system.py | 2 +- 8 files changed, 135 insertions(+), 108 deletions(-) create mode 100644 src/allmydata/introducer/__init__.py rename src/allmydata/{introducer.py => introducer/client.py} (71%) create mode 100644 src/allmydata/introducer/common.py create mode 100644 src/allmydata/introducer/server.py diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 82c97735..959e55b0 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -17,7 +17,7 @@ from allmydata.download import Downloader from allmydata.checker import Checker from allmydata.offloaded import Helper from allmydata.control import ControlServer -from allmydata.introducer import IntroducerClient +from allmydata.introducer.client import IntroducerClient from allmydata.util import hashutil, base32, testutil from allmydata.filenode import FileNode from allmydata.dirnode import NewDirectoryNode diff --git a/src/allmydata/introducer/__init__.py b/src/allmydata/introducer/__init__.py new file mode 100644 index 00000000..a9cab7e6 --- /dev/null +++ b/src/allmydata/introducer/__init__.py @@ -0,0 +1,9 @@ + +# This is for compatibilty with old .tac files, which reference +# allmydata.introducer.IntroducerNode + +from server import IntroducerNode + +# hush pyflakes +_unused = [IntroducerNode] +del _unused diff --git a/src/allmydata/introducer.py b/src/allmydata/introducer/client.py similarity index 71% rename from src/allmydata/introducer.py rename to src/allmydata/introducer/client.py index ddbc3b14..008aff10 100644 --- a/src/allmydata/introducer.py +++ b/src/allmydata/introducer/client.py @@ -1,112 +1,12 @@ -import re, time, sha, os.path +import re, time, sha from base64 import b32decode from zope.interface import implements from twisted.application import service from foolscap import Referenceable -from allmydata import node -from allmydata.interfaces import RIIntroducerPublisherAndSubscriberService, \ - RIIntroducerSubscriberClient, IIntroducerClient +from allmydata.interfaces import RIIntroducerSubscriberClient, IIntroducerClient from allmydata.util import log, idlib - -class IntroducerNode(node.Node): - PORTNUMFILE = "introducer.port" - NODETYPE = "introducer" - - def __init__(self, basedir="."): - node.Node.__init__(self, basedir) - self.init_introducer() - webport = self.get_config("webport") - if webport: - self.init_web(webport) # strports string - - def init_introducer(self): - introducerservice = IntroducerService(self.basedir) - self.add_service(introducerservice) - - d = self.when_tub_ready() - def _publish(res): - self.introducer_url = self.tub.registerReference(introducerservice, - "introducer") - self.log(" introducer is at %s" % self.introducer_url) - self.write_config("introducer.furl", self.introducer_url + "\n") - d.addCallback(_publish) - d.addErrback(log.err, facility="tahoe.init", level=log.BAD) - - def init_web(self, webport): - self.log("init_web(webport=%s)", args=(webport,)) - - from allmydata.webish import IntroducerWebishServer - nodeurl_path = os.path.join(self.basedir, "node.url") - ws = IntroducerWebishServer(webport, nodeurl_path) - self.add_service(ws) - -def make_index(announcement): - (furl, service_name, ri_name, nickname, ver, oldest) = announcement - m = re.match(r'pb://(\w+)@', furl) - assert m - nodeid = b32decode(m.group(1).upper()) - return (nodeid, service_name) - -class IntroducerService(service.MultiService, Referenceable): - implements(RIIntroducerPublisherAndSubscriberService) - name = "introducer" - - def __init__(self, basedir="."): - service.MultiService.__init__(self) - self.introducer_url = None - # 'index' is (tubid, service_name) - self._announcements = {} # dict of index -> (announcement, timestamp) - self._subscribers = {} # dict of (rref->timestamp) dicts - - def log(self, *args, **kwargs): - if "facility" not in kwargs: - kwargs["facility"] = "tahoe.introducer" - return log.msg(*args, **kwargs) - - def get_announcements(self): - return self._announcements - def get_subscribers(self): - return self._subscribers - - def remote_publish(self, announcement): - self.log("introducer: announcement published: %s" % (announcement,) ) - index = make_index(announcement) - if index in self._announcements: - (old_announcement, timestamp) = self._announcements[index] - if old_announcement == announcement: - self.log("but we already knew it, ignoring", level=log.NOISY) - return - else: - self.log("old announcement being updated", level=log.NOISY) - self._announcements[index] = (announcement, time.time()) - (furl, service_name, ri_name, nickname, ver, oldest) = announcement - for s in self._subscribers.get(service_name, []): - s.callRemote("announce", set([announcement])) - - def remote_subscribe(self, subscriber, service_name): - self.log("introducer: subscription[%s] request at %s" % (service_name, - subscriber)) - if service_name not in self._subscribers: - self._subscribers[service_name] = {} - subscribers = self._subscribers[service_name] - if subscriber in subscribers: - self.log("but they're already subscribed, ignoring", - level=log.UNUSUAL) - return - subscribers[subscriber] = time.time() - def _remove(): - self.log("introducer: unsubscribing[%s] %s" % (service_name, - subscriber)) - subscribers.pop(subscriber, None) - subscriber.notifyOnDisconnect(_remove) - - announcements = set( [ ann - for idx,(ann,when) in self._announcements.items() - if idx[1] == service_name] ) - d = subscriber.callRemote("announce", announcements) - d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL) - +from allmydata.introducer.common import make_index class RemoteServiceConnector: diff --git a/src/allmydata/introducer/common.py b/src/allmydata/introducer/common.py new file mode 100644 index 00000000..54f611a5 --- /dev/null +++ b/src/allmydata/introducer/common.py @@ -0,0 +1,11 @@ + +import re +from base64 import b32decode + +def make_index(announcement): + (furl, service_name, ri_name, nickname, ver, oldest) = announcement + m = re.match(r'pb://(\w+)@', furl) + assert m + nodeid = b32decode(m.group(1).upper()) + return (nodeid, service_name) + diff --git a/src/allmydata/introducer/server.py b/src/allmydata/introducer/server.py new file mode 100644 index 00000000..35c1087e --- /dev/null +++ b/src/allmydata/introducer/server.py @@ -0,0 +1,103 @@ + +import time, os.path +from zope.interface import implements +from twisted.application import service +from foolscap import Referenceable +from allmydata import node +from allmydata.interfaces import RIIntroducerPublisherAndSubscriberService +from allmydata.util import log +from allmydata.introducer.common import make_index + +class IntroducerNode(node.Node): + PORTNUMFILE = "introducer.port" + NODETYPE = "introducer" + + def __init__(self, basedir="."): + node.Node.__init__(self, basedir) + self.init_introducer() + webport = self.get_config("webport") + if webport: + self.init_web(webport) # strports string + + def init_introducer(self): + introducerservice = IntroducerService(self.basedir) + self.add_service(introducerservice) + + d = self.when_tub_ready() + def _publish(res): + self.introducer_url = self.tub.registerReference(introducerservice, + "introducer") + self.log(" introducer is at %s" % self.introducer_url) + self.write_config("introducer.furl", self.introducer_url + "\n") + d.addCallback(_publish) + d.addErrback(log.err, facility="tahoe.init", level=log.BAD) + + def init_web(self, webport): + self.log("init_web(webport=%s)", args=(webport,)) + + from allmydata.webish import IntroducerWebishServer + nodeurl_path = os.path.join(self.basedir, "node.url") + ws = IntroducerWebishServer(webport, nodeurl_path) + self.add_service(ws) + +class IntroducerService(service.MultiService, Referenceable): + implements(RIIntroducerPublisherAndSubscriberService) + name = "introducer" + + def __init__(self, basedir="."): + service.MultiService.__init__(self) + self.introducer_url = None + # 'index' is (tubid, service_name) + self._announcements = {} # dict of index -> (announcement, timestamp) + self._subscribers = {} # dict of (rref->timestamp) dicts + + def log(self, *args, **kwargs): + if "facility" not in kwargs: + kwargs["facility"] = "tahoe.introducer" + return log.msg(*args, **kwargs) + + def get_announcements(self): + return self._announcements + def get_subscribers(self): + return self._subscribers + + def remote_publish(self, announcement): + self.log("introducer: announcement published: %s" % (announcement,) ) + index = make_index(announcement) + if index in self._announcements: + (old_announcement, timestamp) = self._announcements[index] + if old_announcement == announcement: + self.log("but we already knew it, ignoring", level=log.NOISY) + return + else: + self.log("old announcement being updated", level=log.NOISY) + self._announcements[index] = (announcement, time.time()) + (furl, service_name, ri_name, nickname, ver, oldest) = announcement + for s in self._subscribers.get(service_name, []): + s.callRemote("announce", set([announcement])) + + def remote_subscribe(self, subscriber, service_name): + self.log("introducer: subscription[%s] request at %s" % (service_name, + subscriber)) + if service_name not in self._subscribers: + self._subscribers[service_name] = {} + subscribers = self._subscribers[service_name] + if subscriber in subscribers: + self.log("but they're already subscribed, ignoring", + level=log.UNUSUAL) + return + subscribers[subscriber] = time.time() + def _remove(): + self.log("introducer: unsubscribing[%s] %s" % (service_name, + subscriber)) + subscribers.pop(subscriber, None) + subscriber.notifyOnDisconnect(_remove) + + announcements = set( [ ann + for idx,(ann,when) in self._announcements.items() + if idx[1] == service_name] ) + d = subscriber.callRemote("announce", announcements) + d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL) + + + diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py index a8e169a0..3d0a9085 100644 --- a/src/allmydata/test/test_client.py +++ b/src/allmydata/test/test_client.py @@ -5,11 +5,12 @@ from twisted.application import service from twisted.python import log import allmydata -from allmydata import client, introducer +from allmydata import client +from allmydata.introducer.client import IntroducerClient from allmydata.util import base32, testutil from foolscap.eventual import flushEventualQueue -class FakeIntroducerClient(introducer.IntroducerClient): +class FakeIntroducerClient(IntroducerClient): def __init__(self): self._connections = set() def add_peer(self, nodeid): diff --git a/src/allmydata/test/test_introducer.py b/src/allmydata/test/test_introducer.py index 53afbda7..e0903bf3 100644 --- a/src/allmydata/test/test_introducer.py +++ b/src/allmydata/test/test_introducer.py @@ -9,7 +9,10 @@ from twisted.python import log from foolscap import Tub, Referenceable from foolscap.eventual import fireEventually, flushEventualQueue from twisted.application import service -from allmydata.introducer import IntroducerClient, IntroducerService, IntroducerNode +from allmydata.introducer.client import IntroducerClient +from allmydata.introducer.server import IntroducerService +# test compatibility with old introducer .tac files +from allmydata.introducer import IntroducerNode from allmydata.util import testutil, idlib class FakeNode(Referenceable): diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 761fbb27..e3fac3e7 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -9,7 +9,7 @@ from twisted.internet.error import ConnectionDone, ConnectionLost from twisted.application import service import allmydata from allmydata import client, uri, download, upload, storage, offloaded -from allmydata.introducer import IntroducerNode +from allmydata.introducer.server import IntroducerNode from allmydata.util import deferredutil, fileutil, idlib, mathutil, testutil from allmydata.util import log from allmydata.scripts import runner, cli -- 2.45.2