-import time, os.path
+import time, os.path, textwrap
from zope.interface import implements
from twisted.application import service
-from foolscap import Referenceable
+from foolscap.api import Referenceable
+import allmydata
from allmydata import node
-from allmydata.util import log
+from allmydata.util import log, rrefutil
+from allmydata.util.fileutil import abspath_expanduser_unicode
from allmydata.introducer.interfaces import \
- RIIntroducerPublisherAndSubscriberService
-from allmydata.introducer.common import make_index
+ RIIntroducerPublisherAndSubscriberService_v2
+from allmydata.introducer.common import convert_announcement_v1_to_v2, \
+ convert_announcement_v2_to_v1, unsign_from_foolscap, make_index, \
+ get_tubid_string_from_ann, SubscriberDescriptor, AnnouncementDescriptor
+
+class FurlFileConflictError(Exception):
+ pass
class IntroducerNode(node.Node):
PORTNUMFILE = "introducer.port"
NODETYPE = "introducer"
+ GENERATED_FILES = ['introducer.furl']
- def __init__(self, basedir="."):
+ def __init__(self, basedir=u"."):
node.Node.__init__(self, basedir)
+ self.read_config()
self.init_introducer()
- webport = self.get_config("webport")
+ webport = self.get_config("node", "web.port", None)
if webport:
self.init_web(webport) # strports string
introducerservice = IntroducerService(self.basedir)
self.add_service(introducerservice)
+ old_public_fn = os.path.join(self.basedir, u"introducer.furl")
+ private_fn = os.path.join(self.basedir, u"private", u"introducer.furl")
+
+ if os.path.exists(old_public_fn):
+ if os.path.exists(private_fn):
+ msg = """This directory (%s) contains both an old public
+ 'introducer.furl' file, and a new-style
+ 'private/introducer.furl', so I cannot safely remove the old
+ one. Please make sure your desired FURL is in
+ private/introducer.furl, and remove the public file. If this
+ causes your Introducer's FURL to change, you need to inform
+ all grid members so they can update their tahoe.cfg.
+ """
+ raise FurlFileConflictError(textwrap.dedent(msg))
+ os.rename(old_public_fn, private_fn)
d = self.when_tub_ready()
def _publish(res):
- self.introducer_url = self.tub.registerReference(introducerservice,
- "introducer")
- self.log(" introducer is at %s" % self.introducer_url)
- self.write_config("introducer.furl", self.introducer_url + "\n")
+ furl = self.tub.registerReference(introducerservice,
+ furlFile=private_fn)
+ self.log(" introducer is at %s" % furl, umid="qF2L9A")
+ self.introducer_url = furl # for tests
d.addCallback(_publish)
- d.addErrback(log.err, facility="tahoe.init", level=log.BAD)
+ d.addErrback(log.err, facility="tahoe.init",
+ level=log.BAD, umid="UaNs9A")
def init_web(self, webport):
- self.log("init_web(webport=%s)", args=(webport,))
+ self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA")
from allmydata.webish import IntroducerWebishServer
- nodeurl_path = os.path.join(self.basedir, "node.url")
- ws = IntroducerWebishServer(webport, nodeurl_path)
+ nodeurl_path = os.path.join(self.basedir, u"node.url")
+ config_staticdir = self.get_config("node", "web.static", "public_html").decode('utf-8')
+ staticdir = abspath_expanduser_unicode(config_staticdir, base=self.basedir)
+ ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir)
self.add_service(ws)
+class WrapV1SubscriberInV2Interface: # for_v1
+ """I wrap a RemoteReference that points at an old v1 subscriber, enabling
+ it to be treated like a v2 subscriber.
+ """
+
+ def __init__(self, original):
+ self.original = original # also used for tests
+ def __eq__(self, them):
+ return self.original == them
+ def __ne__(self, them):
+ return self.original != them
+ def __hash__(self):
+ return hash(self.original)
+ def getRemoteTubID(self):
+ return self.original.getRemoteTubID()
+ def getSturdyRef(self):
+ return self.original.getSturdyRef()
+ def getPeer(self):
+ return self.original.getPeer()
+ def getLocationHints(self):
+ return self.original.getLocationHints()
+ def callRemote(self, methname, *args, **kwargs):
+ m = getattr(self, "wrap_" + methname)
+ return m(*args, **kwargs)
+ def wrap_announce_v2(self, announcements):
+ anns_v1 = [convert_announcement_v2_to_v1(ann) for ann in announcements]
+ return self.original.callRemote("announce", set(anns_v1))
+ def notifyOnDisconnect(self, *args, **kwargs):
+ return self.original.notifyOnDisconnect(*args, **kwargs)
+
class IntroducerService(service.MultiService, Referenceable):
- implements(RIIntroducerPublisherAndSubscriberService)
+ implements(RIIntroducerPublisherAndSubscriberService_v2)
name = "introducer"
+ # v1 is the original protocol, supported since 1.0 (but only advertised
+ # starting in 1.3). v2 is the new signed protocol, supported after 1.9
+ VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": { },
+ "http://allmydata.org/tahoe/protocols/introducer/v2": { },
+ "application-version": str(allmydata.__full_version__),
+ }
def __init__(self, basedir="."):
service.MultiService.__init__(self)
self.introducer_url = None
- # 'index' is (tubid, service_name)
- self._announcements = {} # dict of index -> (announcement, timestamp)
- self._subscribers = {} # dict of (rref->timestamp) dicts
+ # 'index' is (service_name, key_s, tubid), where key_s or tubid is
+ # None
+ self._announcements = {} # dict of index ->
+ # (ann_t, canary, ann, timestamp)
+
+ # ann (the announcement dictionary) is cleaned up: nickname is always
+ # unicode, servicename is always ascii, etc, even though
+ # simplejson.loads sometimes returns either
+
+ # self._subscribers is a dict mapping servicename to subscriptions
+ # 'subscriptions' is a dict mapping rref to a subscription
+ # 'subscription' is a tuple of (subscriber_info, timestamp)
+ # 'subscriber_info' is a dict, provided directly for v2 clients, or
+ # synthesized for v1 clients. The expected keys are:
+ # version, nickname, app-versions, my-version, oldest-supported
+ self._subscribers = {}
+
+ # self._stub_client_announcements contains the information provided
+ # by v1 clients. We stash this so we can match it up with their
+ # subscriptions.
+ self._stub_client_announcements = {} # maps tubid to sinfo # for_v1
+
+ self._debug_counts = {"inbound_message": 0,
+ "inbound_duplicate": 0,
+ "inbound_no_seqnum": 0,
+ "inbound_old_replay": 0,
+ "inbound_update": 0,
+ "outbound_message": 0,
+ "outbound_announcements": 0,
+ "inbound_subscribe": 0}
+ self._debug_outstanding = 0 # also covers WrapV1SubscriberInV2Interface
+
+ def _debug_retired(self, res):
+ self._debug_outstanding -= 1
+ return res
def log(self, *args, **kwargs):
if "facility" not in kwargs:
- kwargs["facility"] = "tahoe.introducer"
+ kwargs["facility"] = "tahoe.introducer.server"
return log.msg(*args, **kwargs)
- def get_announcements(self):
- return self._announcements
+ def get_announcements(self, include_stub_clients=True):
+ """Return a list of AnnouncementDescriptor for all announcements"""
+ announcements = []
+ for (index, (_, canary, ann, when)) in self._announcements.items():
+ if ann["service-name"] == "stub_client":
+ if not include_stub_clients:
+ continue
+ ad = AnnouncementDescriptor(when, index, canary, ann)
+ announcements.append(ad)
+ return announcements
+
def get_subscribers(self):
- return self._subscribers
-
- def remote_publish(self, announcement):
- self.log("introducer: announcement published: %s" % (announcement,) )
- index = make_index(announcement)
- if index in self._announcements:
- (old_announcement, timestamp) = self._announcements[index]
- if old_announcement == announcement:
- self.log("but we already knew it, ignoring", level=log.NOISY)
+ """Return a list of SubscriberDescriptor objects for all subscribers"""
+ s = []
+ for service_name, subscriptions in self._subscribers.items():
+ for rref,(subscriber_info,when) in subscriptions.items():
+ # note that if the subscriber didn't do Tub.setLocation,
+ # tubid will be None. Also, subscribers do not tell us which
+ # pubkey they use; only publishers do that.
+ tubid = rref.getRemoteTubID() or "?"
+ remote_address = rrefutil.stringify_remote_address(rref)
+ # these three assume subscriber_info["version"]==0, but
+ # should tolerate other versions
+ if not subscriber_info:
+ # V1 clients that haven't yet sent their stub_info data
+ subscriber_info = {}
+ nickname = subscriber_info.get("nickname", u"?")
+ version = subscriber_info.get("my-version", u"?")
+ app_versions = subscriber_info.get("app-versions", {})
+ # 'when' is the time they subscribed
+ sd = SubscriberDescriptor(service_name, when,
+ nickname, version, app_versions,
+ remote_address, tubid)
+ s.append(sd)
+ return s
+
+ def remote_get_version(self):
+ return self.VERSION
+
+ def remote_publish(self, ann_t): # for_v1
+ lp = self.log("introducer: old (v1) announcement published: %s"
+ % (ann_t,), umid="6zGOIw")
+ ann_v2 = convert_announcement_v1_to_v2(ann_t)
+ return self.publish(ann_v2, None, lp)
+
+ def remote_publish_v2(self, ann_t, canary):
+ lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ")
+ return self.publish(ann_t, canary, lp)
+
+ def publish(self, ann_t, canary, lp):
+ try:
+ self._publish(ann_t, canary, lp)
+ except:
+ log.err(format="Introducer.remote_publish failed on %(ann)s",
+ ann=ann_t,
+ level=log.UNUSUAL, parent=lp, umid="620rWA")
+ raise
+
+ def _publish(self, ann_t, canary, lp):
+ self._debug_counts["inbound_message"] += 1
+ self.log("introducer: announcement published: %s" % (ann_t,),
+ umid="wKHgCw")
+ ann, key = unsign_from_foolscap(ann_t) # might raise BadSignatureError
+ index = make_index(ann, key)
+
+ service_name = str(ann["service-name"])
+ if service_name == "stub_client": # for_v1
+ self._attach_stub_client(ann, lp)
+ return
+
+ old = self._announcements.get(index)
+ if old:
+ (old_ann_t, canary, old_ann, timestamp) = old
+ if old_ann == ann:
+ self.log("but we already knew it, ignoring", level=log.NOISY,
+ umid="myxzLw")
+ self._debug_counts["inbound_duplicate"] += 1
return
else:
- self.log("old announcement being updated", level=log.NOISY)
- self._announcements[index] = (announcement, time.time())
- (furl, service_name, ri_name, nickname, ver, oldest) = announcement
+ if "seqnum" in old_ann:
+ # must beat previous sequence number to replace
+ if ("seqnum" not in ann
+ or not isinstance(ann["seqnum"], (int,long))):
+ self.log("not replacing old ann, no valid seqnum",
+ level=log.NOISY, umid="ySbaVw")
+ self._debug_counts["inbound_no_seqnum"] += 1
+ return
+ if ann["seqnum"] <= old_ann["seqnum"]:
+ self.log("not replacing old ann, new seqnum is too old"
+ " (%s <= %s) (replay attack?)"
+ % (ann["seqnum"], old_ann["seqnum"]),
+ level=log.UNUSUAL, umid="sX7yqQ")
+ self._debug_counts["inbound_old_replay"] += 1
+ return
+ # ok, seqnum is newer, allow replacement
+ self.log("old announcement being updated", level=log.NOISY,
+ umid="304r9g")
+ self._debug_counts["inbound_update"] += 1
+ self._announcements[index] = (ann_t, canary, ann, time.time())
+ #if canary:
+ # canary.notifyOnDisconnect ...
+ # use a CanaryWatcher? with cw.is_connected()?
+ # actually we just want foolscap to give rref.is_connected(), since
+ # this is only for the status display
+
for s in self._subscribers.get(service_name, []):
- s.callRemote("announce", set([announcement]))
+ self._debug_counts["outbound_message"] += 1
+ self._debug_counts["outbound_announcements"] += 1
+ self._debug_outstanding += 1
+ d = s.callRemote("announce_v2", set([ann_t]))
+ d.addBoth(self._debug_retired)
+ d.addErrback(log.err,
+ format="subscriber errored on announcement %(ann)s",
+ ann=ann_t, facility="tahoe.introducer",
+ level=log.UNUSUAL, umid="jfGMXQ")
+
+ def _attach_stub_client(self, ann, lp):
+ # There might be a v1 subscriber for whom this is a stub_client.
+ # We might have received the subscription before the stub_client
+ # announcement, in which case we now need to fix up the record in
+ # self._subscriptions .
+
+ # record it for later, in case the stub_client arrived before the
+ # subscription
+ subscriber_info = self._get_subscriber_info_from_ann(ann)
+ ann_tubid = get_tubid_string_from_ann(ann)
+ self._stub_client_announcements[ann_tubid] = subscriber_info
+
+ lp2 = self.log("stub_client announcement, "
+ "looking for matching subscriber",
+ parent=lp, level=log.NOISY, umid="BTywDg")
+
+ for sn in self._subscribers:
+ s = self._subscribers[sn]
+ for (subscriber, info) in s.items():
+ # we correlate these by looking for a subscriber whose tubid
+ # matches this announcement
+ sub_tubid = subscriber.getRemoteTubID()
+ if sub_tubid == ann_tubid:
+ self.log(format="found a match, nodeid=%(nodeid)s",
+ nodeid=sub_tubid,
+ level=log.NOISY, parent=lp2, umid="xsWs1A")
+ # found a match. Does it need info?
+ if not info[0]:
+ self.log(format="replacing info",
+ level=log.NOISY, parent=lp2, umid="m5kxwA")
+ # yup
+ s[subscriber] = (subscriber_info, info[1])
+ # and we don't remember or announce stub_clients beyond what we
+ # need to get the subscriber_info set up
- def remote_subscribe(self, subscriber, service_name):
- self.log("introducer: subscription[%s] request at %s" % (service_name,
- subscriber))
+ def _get_subscriber_info_from_ann(self, ann): # for_v1
+ sinfo = { "version": ann["version"],
+ "nickname": ann["nickname"],
+ "app-versions": ann["app-versions"],
+ "my-version": ann["my-version"],
+ "oldest-supported": ann["oldest-supported"],
+ }
+ return sinfo
+
+ def remote_subscribe(self, subscriber, service_name): # for_v1
+ self.log("introducer: old (v1) subscription[%s] request at %s"
+ % (service_name, subscriber), umid="hJlGUg")
+ return self.add_subscriber(WrapV1SubscriberInV2Interface(subscriber),
+ service_name, None)
+
+ def remote_subscribe_v2(self, subscriber, service_name, subscriber_info):
+ self.log("introducer: subscription[%s] request at %s"
+ % (service_name, subscriber), umid="U3uzLg")
+ return self.add_subscriber(subscriber, service_name, subscriber_info)
+
+ def add_subscriber(self, subscriber, service_name, subscriber_info):
+ self._debug_counts["inbound_subscribe"] += 1
if service_name not in self._subscribers:
self._subscribers[service_name] = {}
subscribers = self._subscribers[service_name]
if subscriber in subscribers:
self.log("but they're already subscribed, ignoring",
- level=log.UNUSUAL)
+ level=log.UNUSUAL, umid="Sy9EfA")
return
- subscribers[subscriber] = time.time()
+
+ if not subscriber_info: # for_v1
+ # v1 clients don't provide subscriber_info, but they should
+ # publish a 'stub client' record which contains the same
+ # information. If we've already received this, it will be in
+ # self._stub_client_announcements
+ tubid = subscriber.getRemoteTubID()
+ if tubid in self._stub_client_announcements:
+ subscriber_info = self._stub_client_announcements[tubid]
+
+ subscribers[subscriber] = (subscriber_info, time.time())
def _remove():
self.log("introducer: unsubscribing[%s] %s" % (service_name,
- subscriber))
+ subscriber),
+ umid="vYGcJg")
subscribers.pop(subscriber, None)
subscriber.notifyOnDisconnect(_remove)
- announcements = set( [ ann
- for idx,(ann,when) in self._announcements.items()
- if idx[1] == service_name] )
- d = subscriber.callRemote("announce", announcements)
- d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)
-
-
-
+ # now tell them about any announcements they're interested in
+ announcements = set( [ ann_t
+ for idx,(ann_t,canary,ann,when)
+ in self._announcements.items()
+ if idx[0] == service_name] )
+ if announcements:
+ self._debug_counts["outbound_message"] += 1
+ self._debug_counts["outbound_announcements"] += len(announcements)
+ self._debug_outstanding += 1
+ d = subscriber.callRemote("announce_v2", announcements)
+ d.addBoth(self._debug_retired)
+ d.addErrback(log.err,
+ format="subscriber errored during subscribe %(anns)s",
+ anns=announcements, facility="tahoe.introducer",
+ level=log.UNUSUAL, umid="mtZepQ")
+ return d