-import re, time, sha
-from base64 import b32decode
+import time
from zope.interface import implements
from twisted.application import service
-from foolscap import Referenceable
-from allmydata.interfaces import RIIntroducerSubscriberClient, IIntroducerClient
-from allmydata.util import log, idlib
-from allmydata.introducer.common import make_index
-
-
-class RemoteServiceConnector:
- """I hold information about a peer service that we want to connect to. If
- we are connected, I hold the RemoteReference, the peer's address, and the
- peer's version information. I remember information about when we were
- last connected to the peer too, even if we aren't currently connected.
-
- @ivar announcement_time: when we first heard about this service
- @ivar last_connect_time: when we last established a connection
- @ivar last_loss_time: when we last lost a connection
-
- @ivar version: the peer's version, from the most recent announcement
- @ivar oldest_supported: the peer's oldest supported version, same
- @ivar nickname: the peer's self-reported nickname, same
-
- @ivar rref: the RemoteReference, if connected, otherwise None
- @ivar remote_host: the IAddress, if connected, otherwise None
- """
-
- def __init__(self, announcement, tub, ic):
- self._tub = tub
- self._announcement = announcement
- self._ic = ic
- (furl, service_name, ri_name, nickname, ver, oldest) = announcement
-
- self._furl = furl
- m = re.match(r'pb://(\w+)@', furl)
- assert m
- self._nodeid = b32decode(m.group(1).upper())
- self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
-
- self.service_name = service_name
-
- self.log("attempting to connect to %s" % self._nodeid_s)
- self.announcement_time = time.time()
- self.last_loss_time = None
- self.rref = None
- self.remote_host = None
- self.last_connect_time = None
- self.version = ver
- self.oldest_supported = oldest
- self.nickname = nickname
-
- def log(self, *args, **kwargs):
- return self._ic.log(*args, **kwargs)
-
- def startConnecting(self):
- self._reconnector = self._tub.connectTo(self._furl, self._got_service)
-
- def stopConnecting(self):
- self._reconnector.stopConnecting()
-
- def _got_service(self, rref):
- self.last_connect_time = time.time()
- self.remote_host = rref.tracker.broker.transport.getPeer()
-
- self.rref = rref
- self.log("connected to %s" % self._nodeid_s)
-
- self._ic.add_connection(self._nodeid, self.service_name, rref)
-
- rref.notifyOnDisconnect(self._lost, rref)
-
- def _lost(self, rref):
- self.log("lost connection to %s" % self._nodeid_s)
- self.last_loss_time = time.time()
- self.rref = None
- self.remote_host = None
- self._ic.remove_connection(self._nodeid, self.service_name, rref)
-
- def reset(self):
- self._reconnector.reset()
+from foolscap.api import Referenceable, eventually, RemoteInterface
+from allmydata.interfaces import InsufficientVersionError
+from allmydata.introducer.interfaces import IIntroducerClient, \
+ RIIntroducerSubscriberClient_v1, RIIntroducerSubscriberClient_v2
+from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\
+ convert_announcement_v1_to_v2, convert_announcement_v2_to_v1, \
+ make_index, get_tubid_string_from_ann, get_tubid_string
+from allmydata.util import log
+from allmydata.util.rrefutil import add_version_to_remote_reference
+from allmydata.util.keyutil import BadSignatureError
+
+class WrapV2ClientInV1Interface(Referenceable): # for_v1
+ """I wrap a v2 IntroducerClient to make it look like a v1 client, so it
+ can be attached to an old server."""
+ implements(RIIntroducerSubscriberClient_v1)
+
+ def __init__(self, original):
+ self.original = original
+ def remote_announce(self, announcements):
+ lp = self.original.log("received %d announcements (v1)" %
+ len(announcements))
+ anns_v1 = set([convert_announcement_v1_to_v2(ann_v1)
+ for ann_v1 in announcements])
+ return self.original.got_announcements(anns_v1, lp)
+
+class RIStubClient(RemoteInterface): # for_v1
+ """Each client publishes a service announcement for a dummy object called
+ the StubClient. This object doesn't actually offer any services, but the
+ announcement helps the Introducer keep track of which clients are
+ subscribed (so the grid admin can keep track of things like the size of
+ the grid and the client versions in use. This is the (empty)
+ RemoteInterface for the StubClient."""
+
+class StubClient(Referenceable): # for_v1
+ implements(RIStubClient)
+
+V1 = "http://allmydata.org/tahoe/protocols/introducer/v1"
+V2 = "http://allmydata.org/tahoe/protocols/introducer/v2"
class IntroducerClient(service.Service, Referenceable):
- implements(RIIntroducerSubscriberClient, IIntroducerClient)
+ implements(RIIntroducerSubscriberClient_v2, IIntroducerClient)
def __init__(self, tub, introducer_furl,
- nickname, my_version, oldest_supported):
+ nickname, my_version, oldest_supported,
+ app_versions, sequencer):
self._tub = tub
self.introducer_furl = introducer_furl
+ assert type(nickname) is unicode
self._nickname = nickname
self._my_version = my_version
self._oldest_supported = oldest_supported
-
- self._published_announcements = set()
+ self._app_versions = app_versions
+ self._sequencer = sequencer
+
+ self._my_subscriber_info = { "version": 0,
+ "nickname": self._nickname,
+ "app-versions": self._app_versions,
+ "my-version": self._my_version,
+ "oldest-supported": self._oldest_supported,
+ }
+ self._stub_client = None # for_v1
+ self._stub_client_furl = None
+
+ self._outbound_announcements = {} # not signed
+ self._published_announcements = {} # signed
+ self._canary = Referenceable()
self._publisher = None
- self._connected = False
+ self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
self._subscribed_service_names = set()
self._subscriptions = set() # requests we've actually sent
- self._received_announcements = set()
- # TODO: this set will grow without bound, until the node is restarted
-
- # we only accept one announcement per (peerid+service_name) pair.
- # This insures that an upgraded host replace their previous
- # announcement. It also means that each peer must have their own Tub
- # (no sharing), which is slightly weird but consistent with the rest
- # of the Tahoe codebase.
- self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
- # self._connections is a set of (peerid, service_name, rref) tuples
- self._connections = set()
- self.counter = 0 # incremented each time we change state, for tests
- self.encoding_parameters = None
+ # _inbound_announcements remembers one announcement per
+ # (servicename,serverid) pair. Anything that arrives with the same
+ # pair will displace the previous one. This stores tuples of
+ # (unpacked announcement dictionary, verifyingkey, rxtime). The ann
+ # dicts can be compared for equality to distinguish re-announcement
+ # from updates. It also provides memory for clients who subscribe
+ # after startup.
+ self._inbound_announcements = {}
+
+ # hooks for unit tests
+ self._debug_counts = {
+ "inbound_message": 0,
+ "inbound_announcement": 0,
+ "wrong_service": 0,
+ "duplicate_announcement": 0,
+ "update": 0,
+ "new_announcement": 0,
+ "outbound_message": 0,
+ }
+ self._debug_outstanding = 0
+
+ def _debug_retired(self, res):
+ self._debug_outstanding -= 1
+ return res
def startService(self):
service.Service.startService(self)
+ self._introducer_error = None
rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
self._introducer_reconnector = rc
def connect_failed(failure):
self.log("Initial Introducer connection failed: perhaps it's down",
- level=log.WEIRD, failure=failure)
+ level=log.WEIRD, failure=failure, umid="c5MqUQ")
d = self._tub.getReference(self.introducer_furl)
d.addErrback(connect_failed)
def _got_introducer(self, publisher):
- self.log("connected to introducer")
- self._connected = True
+ self.log("connected to introducer, getting versions")
+ default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
+ { },
+ "application-version": "unknown: no get_version()",
+ }
+ d = add_version_to_remote_reference(publisher, default)
+ d.addCallback(self._got_versioned_introducer)
+ d.addErrback(self._got_error)
+
+ def _got_error(self, f):
+ # TODO: for the introducer, perhaps this should halt the application
+ self._introducer_error = f # polled by tests
+
+ def _got_versioned_introducer(self, publisher):
+ self.log("got introducer version: %s" % (publisher.version,))
+ # we require an introducer that speaks at least one of (V1, V2)
+ if not (V1 in publisher.version or V2 in publisher.version):
+ raise InsufficientVersionError("V1 or V2", publisher.version)
self._publisher = publisher
publisher.notifyOnDisconnect(self._disconnected)
self._maybe_publish()
def _disconnected(self):
self.log("bummer, we've lost our connection to the introducer")
- self._connected = False
self._publisher = None
self._subscriptions.clear()
- def stopService(self):
- service.Service.stopService(self)
- self._introducer_reconnector.stopConnecting()
- for rsc in self._connectors.itervalues():
- rsc.stopConnecting()
-
def log(self, *args, **kwargs):
if "facility" not in kwargs:
- kwargs["facility"] = "tahoe.introducer"
+ kwargs["facility"] = "tahoe.introducer.client"
return log.msg(*args, **kwargs)
-
- def publish(self, furl, service_name, remoteinterface_name):
- ann = (furl, service_name, remoteinterface_name,
- self._nickname, self._my_version, self._oldest_supported)
- self._published_announcements.add(ann)
- self._maybe_publish()
-
- def subscribe_to(self, service_name):
+ def subscribe_to(self, service_name, cb, *args, **kwargs):
+ self._local_subscribers.append( (service_name,cb,args,kwargs) )
self._subscribed_service_names.add(service_name)
self._maybe_subscribe()
+ for index,(ann,key_s,when) in self._inbound_announcements.items():
+ servicename = index[0]
+ if servicename == service_name:
+ eventually(cb, key_s, ann, *args, **kwargs)
def _maybe_subscribe(self):
if not self._publisher:
level=log.NOISY)
return
for service_name in self._subscribed_service_names:
- if service_name not in self._subscriptions:
- # there is a race here, but the subscription desk ignores
- # duplicate requests.
- self._subscriptions.add(service_name)
- d = self._publisher.callRemote("subscribe", self, service_name)
- d.addErrback(log.err, facility="tahoe.introducer",
- level=log.WEIRD)
+ if service_name in self._subscriptions:
+ continue
+ self._subscriptions.add(service_name)
+ if V2 in self._publisher.version:
+ self._debug_outstanding += 1
+ d = self._publisher.callRemote("subscribe_v2",
+ self, service_name,
+ self._my_subscriber_info)
+ d.addBoth(self._debug_retired)
+ else:
+ d = self._subscribe_handle_v1(service_name) # for_v1
+ d.addErrback(log.err, facility="tahoe.introducer.client",
+ level=log.WEIRD, umid="2uMScQ")
+
+ def _subscribe_handle_v1(self, service_name): # for_v1
+ # they don't speak V2: must be a v1 introducer. Fall back to the v1
+ # 'subscribe' method, using a client adapter.
+ ca = WrapV2ClientInV1Interface(self)
+ self._debug_outstanding += 1
+ d = self._publisher.callRemote("subscribe", ca, service_name)
+ d.addBoth(self._debug_retired)
+ # We must also publish an empty 'stub_client' object, so the
+ # introducer can count how many clients are connected and see what
+ # versions they're running.
+ if not self._stub_client_furl:
+ self._stub_client = sc = StubClient()
+ self._stub_client_furl = self._tub.registerReference(sc)
+ def _publish_stub_client(ignored):
+ furl = self._stub_client_furl
+ self.publish("stub_client",
+ { "anonymous-storage-FURL": furl,
+ "permutation-seed-base32": get_tubid_string(furl),
+ })
+ d.addCallback(_publish_stub_client)
+ return d
+
+ def create_announcement_dict(self, service_name, ann):
+ ann_d = { "version": 0,
+ # "seqnum" and "nonce" will be populated with new values in
+ # publish(), each time we make a change
+ "nickname": self._nickname,
+ "app-versions": self._app_versions,
+ "my-version": self._my_version,
+ "oldest-supported": self._oldest_supported,
+
+ "service-name": service_name,
+ }
+ ann_d.update(ann)
+ return ann_d
+
+ def publish(self, service_name, ann, signing_key=None):
+ # we increment the seqnum every time we publish something new
+ current_seqnum, current_nonce = self._sequencer()
+
+ ann_d = self.create_announcement_dict(service_name, ann)
+ self._outbound_announcements[service_name] = ann_d
+
+ # publish all announcements with the new seqnum and nonce
+ for service_name,ann_d in self._outbound_announcements.items():
+ ann_d["seqnum"] = current_seqnum
+ ann_d["nonce"] = current_nonce
+ ann_t = sign_to_foolscap(ann_d, signing_key)
+ self._published_announcements[service_name] = ann_t
+ self._maybe_publish()
def _maybe_publish(self):
if not self._publisher:
self.log("want to publish, but no introducer yet", level=log.NOISY)
return
# this re-publishes everything. The Introducer ignores duplicates
- for ann in self._published_announcements:
- d = self._publisher.callRemote("publish", ann)
- d.addErrback(log.err, facility="tahoe.introducer",
- level=log.WEIRD)
-
-
-
- def remote_announce(self, announcements):
- for ann in announcements:
- self.log("received %d announcements" % len(announcements))
- (furl, service_name, ri_name, nickname, ver, oldest) = ann
- if service_name not in self._subscribed_service_names:
- self.log("announcement for a service we don't care about [%s]"
- % (service_name,), level=log.WEIRD)
- continue
- if ann in self._received_announcements:
- self.log("ignoring old announcement: %s" % (ann,),
- level=log.NOISY)
+ for ann_t in self._published_announcements.values():
+ self._debug_counts["outbound_message"] += 1
+ if V2 in self._publisher.version:
+ self._debug_outstanding += 1
+ d = self._publisher.callRemote("publish_v2", ann_t,
+ self._canary)
+ d.addBoth(self._debug_retired)
+ else:
+ d = self._handle_v1_publisher(ann_t) # for_v1
+ d.addErrback(log.err, ann_t=ann_t,
+ facility="tahoe.introducer.client",
+ level=log.WEIRD, umid="xs9pVQ")
+
+ def _handle_v1_publisher(self, ann_t): # for_v1
+ # they don't speak V2, so fall back to the old 'publish' method
+ # (which takes an unsigned tuple of bytestrings)
+ self.log("falling back to publish_v1",
+ level=log.UNUSUAL, umid="9RCT1A")
+ ann_v1 = convert_announcement_v2_to_v1(ann_t)
+ self._debug_outstanding += 1
+ d = self._publisher.callRemote("publish", ann_v1)
+ d.addBoth(self._debug_retired)
+ return d
+
+
+ def remote_announce_v2(self, announcements):
+ lp = self.log("received %d announcements (v2)" % len(announcements))
+ return self.got_announcements(announcements, lp)
+
+ def got_announcements(self, announcements, lp=None):
+ # this is the common entry point for both v1 and v2 announcements
+ self._debug_counts["inbound_message"] += 1
+ for ann_t in announcements:
+ try:
+ # this might raise UnknownKeyError or bad-sig error
+ ann, key_s = unsign_from_foolscap(ann_t)
+ # key is "v0-base32abc123"
+ except BadSignatureError:
+ self.log("bad signature on inbound announcement: %s" % (ann_t,),
+ parent=lp, level=log.WEIRD, umid="ZAU15Q")
+ # process other announcements that arrived with the bad one
continue
- self.log("new announcement[%s]: %s" % (service_name, ann))
- self._received_announcements.add(ann)
- self._new_announcement(ann)
-
- def _new_announcement(self, announcement):
- # this will only be called for new announcements
- index = make_index(announcement)
- if index in self._connectors:
- self.log("replacing earlier announcement", level=log.NOISY)
- self._connectors[index].stopConnecting()
- rsc = RemoteServiceConnector(announcement, self._tub, self)
- self._connectors[index] = rsc
- rsc.startConnecting()
-
- def add_connection(self, nodeid, service_name, rref):
- self._connections.add( (nodeid, service_name, rref) )
- self.counter += 1
- # when one connection is established, reset the timers on all others,
- # to trigger a reconnection attempt in one second. This is intended
- # to accelerate server connections when we've been offline for a
- # while. The goal is to avoid hanging out for a long time with
- # connections to only a subset of the servers, which would increase
- # the chances that we'll put shares in weird places (and not update
- # existing shares of mutable files). See #374 for more details.
- for rsc in self._connectors.values():
- rsc.reset()
-
- def remove_connection(self, nodeid, service_name, rref):
- self._connections.discard( (nodeid, service_name, rref) )
- self.counter += 1
-
-
- def get_all_connections(self):
- return frozenset(self._connections)
-
- def get_all_connectors(self):
- return self._connectors.copy()
-
- def get_all_peerids(self):
- return frozenset([peerid
- for (peerid, service_name, rref)
- in self._connections])
-
- def get_all_connections_for(self, service_name):
- return frozenset([c
- for c in self._connections
- if c[1] == service_name])
-
- def get_permuted_peers(self, service_name, key):
- """Return an ordered list of (peerid, rref) tuples."""
-
- results = []
- for (c_peerid, c_service_name, rref) in self._connections:
- assert isinstance(c_peerid, str)
- if c_service_name != service_name:
- continue
- permuted = sha.new(key + c_peerid).digest()
- results.append((permuted, c_peerid, rref))
-
- results.sort(lambda a,b: cmp(a[0], b[0]))
- return [ (r[1], r[2]) for r in results ]
+ self._process_announcement(ann, key_s)
+ def _process_announcement(self, ann, key_s):
+ self._debug_counts["inbound_announcement"] += 1
+ service_name = str(ann["service-name"])
+ if service_name not in self._subscribed_service_names:
+ self.log("announcement for a service we don't care about [%s]"
+ % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
+ self._debug_counts["wrong_service"] += 1
+ return
+ # for ASCII values, simplejson might give us unicode *or* bytes
+ if "nickname" in ann and isinstance(ann["nickname"], str):
+ ann["nickname"] = unicode(ann["nickname"])
+ nick_s = ann.get("nickname",u"").encode("utf-8")
+ lp2 = self.log(format="announcement for nickname '%(nick)s', service=%(svc)s: %(ann)s",
+ nick=nick_s, svc=service_name, ann=ann, umid="BoKEag")
+
+ # how do we describe this node in the logs?
+ desc_bits = []
+ if key_s:
+ desc_bits.append("serverid=" + key_s[:20])
+ if "anonymous-storage-FURL" in ann:
+ tubid_s = get_tubid_string_from_ann(ann)
+ desc_bits.append("tubid=" + tubid_s[:8])
+ description = "/".join(desc_bits)
+
+ # the index is used to track duplicates
+ index = make_index(ann, key_s)
+
+ # is this announcement a duplicate?
+ if (index in self._inbound_announcements
+ and self._inbound_announcements[index][0] == ann):
+ self.log(format="reannouncement for [%(service)s]:%(description)s, ignoring",
+ service=service_name, description=description,
+ parent=lp2, level=log.UNUSUAL, umid="B1MIdA")
+ self._debug_counts["duplicate_announcement"] += 1
+ return
- def remote_set_encoding_parameters(self, parameters):
- self.encoding_parameters = parameters
+ # does it update an existing one?
+ if index in self._inbound_announcements:
+ old,_,_ = self._inbound_announcements[index]
+ if "seqnum" in old:
+ # must beat previous sequence number to replace
+ if ("seqnum" not in ann
+ or not isinstance(ann["seqnum"], (int,long))):
+ self.log("not replacing old announcement, no valid seqnum: %s"
+ % (ann,),
+ parent=lp2, level=log.NOISY, umid="zFGH3Q")
+ return
+ if ann["seqnum"] <= old["seqnum"]:
+ # note that exact replays are caught earlier, by
+ # comparing the entire signed announcement.
+ self.log("not replacing old announcement, "
+ "new seqnum is too old (%s <= %s) "
+ "(replay attack?): %s"
+ % (ann["seqnum"], old["seqnum"], ann),
+ parent=lp2, level=log.UNUSUAL, umid="JAAAoQ")
+ return
+ # ok, seqnum is newer, allow replacement
+ self._debug_counts["update"] += 1
+ self.log("replacing old announcement: %s" % (ann,),
+ parent=lp2, level=log.NOISY, umid="wxwgIQ")
+ else:
+ self._debug_counts["new_announcement"] += 1
+ self.log("new announcement[%s]" % service_name,
+ parent=lp2, level=log.NOISY)
+
+ self._inbound_announcements[index] = (ann, key_s, time.time())
+ # note: we never forget an index, but we might update its value
+
+ for (service_name2,cb,args,kwargs) in self._local_subscribers:
+ if service_name2 == service_name:
+ eventually(cb, key_s, ann, *args, **kwargs)
def connected_to_introducer(self):
- return self._connected
-
- def debug_disconnect_from_peerid(self, victim_nodeid):
- # for unit tests: locate and sever all connections to the given
- # peerid.
- for (nodeid, service_name, rref) in self._connections:
- if nodeid == victim_nodeid:
- rref.tracker.broker.transport.loseConnection()
+ return bool(self._publisher)