import os, stat, time, weakref
-from allmydata.interfaces import RIStorageServer
from allmydata import node
from zope.interface import implements
from twisted.internet import reactor, defer
from twisted.application import service
from twisted.application.internet import TimerService
-from foolscap.api import Referenceable
from pycryptopp.publickey import rsa
import allmydata
from allmydata.immutable.offloaded import Helper
from allmydata.control import ControlServer
from allmydata.introducer.client import IntroducerClient
-from allmydata.util import hashutil, base32, pollmixin, log
+from allmydata.util import hashutil, base32, pollmixin, log, keyutil
from allmydata.util.encodingutil import get_filesystem_encoding
from allmydata.util.abbreviate import parse_abbreviated_size
from allmydata.util.time_format import parse_duration, parse_date
from allmydata.stats import StatsProvider
from allmydata.history import History
-from allmydata.interfaces import IStatsProducer, RIStubClient, \
- SDMF_VERSION, MDMF_VERSION
+from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION
from allmydata.nodemaker import NodeMaker
from allmydata.blacklist import Blacklist
from allmydata.node import OldConfigOptionError
TiB=1024*GiB
PiB=1024*TiB
-class StubClient(Referenceable):
- implements(RIStubClient)
-
def _make_secret():
return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
ic = IntroducerClient(self.tub, self.introducer_furl,
self.nickname,
str(allmydata.__full_version__),
- str(self.OLDEST_SUPPORTED_VERSION))
+ str(self.OLDEST_SUPPORTED_VERSION),
+ self.get_app_versions())
self.introducer_client = ic
# hold off on starting the IntroducerClient until our tub has been
# started, so we'll have a useful address on our RemoteReference, so
self.convergence = base32.a2b(convergence_s)
self._secret_holder = SecretHolder(lease_secret, self.convergence)
+ def _maybe_create_server_key(self):
+ # we only create the key once. On all subsequent runs, we re-use the
+ # existing key
+ def _make_key():
+ sk_vs,vk_vs = keyutil.make_keypair()
+ return sk_vs+"\n"
+ sk_vs = self.get_or_create_private_config("server.privkey", _make_key)
+ sk,vk_vs = keyutil.parse_privkey(sk_vs.strip())
+ self.write_config("server.pubkey", vk_vs+"\n")
+ self._server_key = sk
+
+ def _init_permutation_seed(self, ss):
+ seed = self.get_config_from_file("permutation-seed")
+ if not seed:
+ have_shares = ss.have_shares()
+ if have_shares:
+ # if the server has shares but not a recorded
+ # permutation-seed, then it has been around since pre-#466
+ # days, and the clients who uploaded those shares used our
+ # TubID as a permutation-seed. We should keep using that same
+ # seed to keep the shares in the same place in the permuted
+ # ring, so those clients don't have to perform excessive
+ # searches.
+ seed = base32.b2a(self.nodeid)
+ else:
+ # otherwise, we're free to use the more natural seed of our
+ # pubkey-based serverid
+ vk_bytes = self._server_key.get_verifying_key_bytes()
+ seed = base32.b2a(vk_bytes)
+ self.write_config("permutation-seed", seed+"\n")
+ return seed.strip()
+
def init_storage(self):
# should we run a storage server (and publish it for others to use)?
if not self.get_config("storage", "enabled", True, boolean=True):
return
readonly = self.get_config("storage", "readonly", False, boolean=True)
+ self._maybe_create_server_key()
+
storedir = os.path.join(self.basedir, self.STOREDIR)
data = self.get_config("storage", "reserved_space", None)
def _publish(res):
furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding())
furl = self.tub.registerReference(ss, furlFile=furl_file)
- ri_name = RIStorageServer.__remote_name__
- self.introducer_client.publish(furl, "storage", ri_name)
+ ann = {"anonymous-storage-FURL": furl,
+ "permutation-seed-base32": self._init_permutation_seed(ss),
+ }
+ self.introducer_client.publish("storage", ann, self._server_key)
d.addCallback(_publish)
d.addErrback(log.err, facility="tahoe.init",
level=log.BAD, umid="aLGBKw")
self.terminator.setServiceParent(self)
self.add_service(Uploader(helper_furl, self.stats_provider,
self.history))
- self.init_stub_client()
self.init_blacklist()
self.init_nodemaker()
def get_storage_broker(self):
return self.storage_broker
- def init_stub_client(self):
- def _publish(res):
- # we publish an empty object so that the introducer can count how
- # many clients are connected and see what versions they're
- # running.
- sc = StubClient()
- furl = self.tub.registerReference(sc)
- ri_name = RIStubClient.__remote_name__
- self.introducer_client.publish(furl, "stub_client", ri_name)
- d = self.when_tub_ready()
- d.addCallback(_publish)
- d.addErrback(log.err, facility="tahoe.init",
- level=log.BAD, umid="OEHq3g")
-
def init_blacklist(self):
fn = os.path.join(self.basedir, "access.blacklist")
self.blacklist = Blacklist(fn)
LeaseRenewSecret = Hash # used to protect bucket lease renewal requests
LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests
-class RIStubClient(RemoteInterface):
- """Each client publishes a service announcement for a dummy object called
- the StubClient. This object doesn't actually offer any services, but the
- announcement helps the Introducer keep track of which clients are
- subscribed (so the grid admin can keep track of things like the size of
- the grid and the client versions in use. This is the (empty)
- RemoteInterface for the StubClient."""
-
class RIBucketWriter(RemoteInterface):
""" Objects of this kind live on the server side. """
def write(offset=Offset, data=ShareData):
-from base64 import b32decode
+import time
from zope.interface import implements
from twisted.application import service
-from foolscap.api import Referenceable, SturdyRef, eventually
+from foolscap.api import Referenceable, eventually, RemoteInterface
from allmydata.interfaces import InsufficientVersionError
-from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
- IIntroducerClient
-from allmydata.util import log, idlib
-from allmydata.util.rrefutil import add_version_to_remote_reference, trap_deadref
+from allmydata.introducer.interfaces import IIntroducerClient, \
+ RIIntroducerSubscriberClient_v1, RIIntroducerSubscriberClient_v2
+from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\
+ convert_announcement_v1_to_v2, convert_announcement_v2_to_v1, \
+ make_index, get_tubid_string_from_ann, get_tubid_string
+from allmydata.util import log
+from allmydata.util.rrefutil import add_version_to_remote_reference
+from allmydata.util.keyutil import BadSignatureError
+
+class WrapV2ClientInV1Interface(Referenceable): # for_v1
+ """I wrap a v2 IntroducerClient to make it look like a v1 client, so it
+ can be attached to an old server."""
+ implements(RIIntroducerSubscriberClient_v1)
+
+ def __init__(self, original):
+ self.original = original
+ def remote_announce(self, announcements):
+ lp = self.original.log("received %d announcements (v1)" %
+ len(announcements))
+ anns_v1 = set([convert_announcement_v1_to_v2(ann_v1)
+ for ann_v1 in announcements])
+ return self.original.got_announcements(anns_v1, lp)
+
+ def remote_set_encoding_parameters(self, parameters):
+ self.original.remote_set_encoding_parameters(parameters)
+
+class RIStubClient(RemoteInterface): # for_v1
+ """Each client publishes a service announcement for a dummy object called
+ the StubClient. This object doesn't actually offer any services, but the
+ announcement helps the Introducer keep track of which clients are
+ subscribed (so the grid admin can keep track of things like the size of
+ the grid and the client versions in use. This is the (empty)
+ RemoteInterface for the StubClient."""
+
+class StubClient(Referenceable): # for_v1
+ implements(RIStubClient)
+
+V1 = "http://allmydata.org/tahoe/protocols/introducer/v1"
+V2 = "http://allmydata.org/tahoe/protocols/introducer/v2"
class IntroducerClient(service.Service, Referenceable):
- implements(RIIntroducerSubscriberClient, IIntroducerClient)
+ implements(RIIntroducerSubscriberClient_v2, IIntroducerClient)
def __init__(self, tub, introducer_furl,
- nickname, my_version, oldest_supported):
+ nickname, my_version, oldest_supported,
+ app_versions):
self._tub = tub
self.introducer_furl = introducer_furl
assert type(nickname) is unicode
- self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
+ self._nickname = nickname
self._my_version = my_version
self._oldest_supported = oldest_supported
+ self._app_versions = app_versions
- self._published_announcements = set()
+ self._my_subscriber_info = { "version": 0,
+ "nickname": self._nickname,
+ "app-versions": self._app_versions,
+ "my-version": self._my_version,
+ "oldest-supported": self._oldest_supported,
+ }
+ self._stub_client = None # for_v1
+ self._stub_client_furl = None
+
+ self._published_announcements = {}
+ self._canary = Referenceable()
self._publisher = None
# _current_announcements remembers one announcement per
# (servicename,serverid) pair. Anything that arrives with the same
- # pair will displace the previous one. This stores unpacked
- # announcement dictionaries, which can be compared for equality to
- # distinguish re-announcement from updates. It also provides memory
- # for clients who subscribe after startup.
+ # pair will displace the previous one. This stores tuples of
+ # (unpacked announcement dictionary, verifyingkey, rxtime). The ann
+ # dicts can be compared for equality to distinguish re-announcement
+ # from updates. It also provides memory for clients who subscribe
+ # after startup.
self._current_announcements = {}
self.encoding_parameters = None
"new_announcement": 0,
"outbound_message": 0,
}
+ self._debug_outstanding = 0
+
+ def _debug_retired(self, res):
+ self._debug_outstanding -= 1
+ return res
def startService(self):
service.Service.startService(self)
def _got_versioned_introducer(self, publisher):
self.log("got introducer version: %s" % (publisher.version,))
- # we require a V1 introducer
- needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
- if needed not in publisher.version:
- raise InsufficientVersionError(needed, publisher.version)
+ # we require an introducer that speaks at least one of (V1, V2)
+ if not (V1 in publisher.version or V2 in publisher.version):
+ raise InsufficientVersionError("V1 or V2", publisher.version)
self._publisher = publisher
publisher.notifyOnDisconnect(self._disconnected)
self._maybe_publish()
def log(self, *args, **kwargs):
if "facility" not in kwargs:
- kwargs["facility"] = "tahoe.introducer"
+ kwargs["facility"] = "tahoe.introducer.client"
return log.msg(*args, **kwargs)
-
- def publish(self, furl, service_name, remoteinterface_name):
- assert type(self._nickname_utf8) is str # we always send UTF-8
- ann = (furl, service_name, remoteinterface_name,
- self._nickname_utf8, self._my_version, self._oldest_supported)
- self._published_announcements.add(ann)
- self._maybe_publish()
-
def subscribe_to(self, service_name, cb, *args, **kwargs):
self._local_subscribers.append( (service_name,cb,args,kwargs) )
self._subscribed_service_names.add(service_name)
self._maybe_subscribe()
- for (servicename,nodeid),ann_d in self._current_announcements.items():
+ for index,(ann,key_s,when) in self._current_announcements.items():
+ servicename = index[0]
if servicename == service_name:
- eventually(cb, nodeid, ann_d)
+ eventually(cb, key_s, ann, *args, **kwargs)
def _maybe_subscribe(self):
if not self._publisher:
level=log.NOISY)
return
for service_name in self._subscribed_service_names:
- if service_name not in self._subscriptions:
- # there is a race here, but the subscription desk ignores
- # duplicate requests.
- self._subscriptions.add(service_name)
- d = self._publisher.callRemote("subscribe", self, service_name)
- d.addErrback(trap_deadref)
- d.addErrback(log.err, format="server errored during subscribe",
- facility="tahoe.introducer",
- level=log.WEIRD, umid="2uMScQ")
+ if service_name in self._subscriptions:
+ continue
+ self._subscriptions.add(service_name)
+ if V2 in self._publisher.version:
+ self._debug_outstanding += 1
+ d = self._publisher.callRemote("subscribe_v2",
+ self, service_name,
+ self._my_subscriber_info)
+ d.addBoth(self._debug_retired)
+ else:
+ d = self._subscribe_handle_v1(service_name) # for_v1
+ d.addErrback(log.err, facility="tahoe.introducer.client",
+ level=log.WEIRD, umid="2uMScQ")
+
+ def _subscribe_handle_v1(self, service_name): # for_v1
+ # they don't speak V2: must be a v1 introducer. Fall back to the v1
+ # 'subscribe' method, using a client adapter.
+ ca = WrapV2ClientInV1Interface(self)
+ self._debug_outstanding += 1
+ d = self._publisher.callRemote("subscribe", ca, service_name)
+ d.addBoth(self._debug_retired)
+ # We must also publish an empty 'stub_client' object, so the
+ # introducer can count how many clients are connected and see what
+ # versions they're running.
+ if not self._stub_client_furl:
+ self._stub_client = sc = StubClient()
+ self._stub_client_furl = self._tub.registerReference(sc)
+ def _publish_stub_client(ignored):
+ furl = self._stub_client_furl
+ self.publish("stub_client",
+ { "anonymous-storage-FURL": furl,
+ "permutation-seed-base32": get_tubid_string(furl),
+ })
+ d.addCallback(_publish_stub_client)
+ return d
+
+ def create_announcement(self, service_name, ann, signing_key):
+ full_ann = { "version": 0,
+ "nickname": self._nickname,
+ "app-versions": self._app_versions,
+ "my-version": self._my_version,
+ "oldest-supported": self._oldest_supported,
+
+ "service-name": service_name,
+ }
+ full_ann.update(ann)
+ return sign_to_foolscap(full_ann, signing_key)
+
+ def publish(self, service_name, ann, signing_key=None):
+ ann_t = self.create_announcement(service_name, ann, signing_key)
+ self._published_announcements[service_name] = ann_t
+ self._maybe_publish()
def _maybe_publish(self):
if not self._publisher:
self.log("want to publish, but no introducer yet", level=log.NOISY)
return
# this re-publishes everything. The Introducer ignores duplicates
- for ann in self._published_announcements:
+ for ann_t in self._published_announcements.values():
self._debug_counts["outbound_message"] += 1
- d = self._publisher.callRemote("publish", ann)
- d.addErrback(trap_deadref)
- d.addErrback(log.err,
- format="server errored during publish %(ann)s",
- ann=ann, facility="tahoe.introducer",
+ if V2 in self._publisher.version:
+ self._debug_outstanding += 1
+ d = self._publisher.callRemote("publish_v2", ann_t,
+ self._canary)
+ d.addBoth(self._debug_retired)
+ else:
+ d = self._handle_v1_publisher(ann_t) # for_v1
+ d.addErrback(log.err, ann_t=ann_t,
+ facility="tahoe.introducer.client",
level=log.WEIRD, umid="xs9pVQ")
+ def _handle_v1_publisher(self, ann_t): # for_v1
+ # they don't speak V2, so fall back to the old 'publish' method
+ # (which takes an unsigned tuple of bytestrings)
+ self.log("falling back to publish_v1",
+ level=log.UNUSUAL, umid="9RCT1A")
+ ann_v1 = convert_announcement_v2_to_v1(ann_t)
+ self._debug_outstanding += 1
+ d = self._publisher.callRemote("publish", ann_v1)
+ d.addBoth(self._debug_retired)
+ return d
- def remote_announce(self, announcements):
- self.log("received %d announcements" % len(announcements))
+ def remote_announce_v2(self, announcements):
+ lp = self.log("received %d announcements (v2)" % len(announcements))
+ return self.got_announcements(announcements, lp)
+
+ def got_announcements(self, announcements, lp=None):
+ # this is the common entry point for both v1 and v2 announcements
self._debug_counts["inbound_message"] += 1
- for ann in announcements:
+ for ann_t in announcements:
try:
- self._process_announcement(ann)
- except:
- log.err(format="unable to process announcement %(ann)s",
- ann=ann)
- # Don't let a corrupt announcement prevent us from processing
- # the remaining ones. Don't return an error to the server,
- # since they'd just ignore it anyways.
- pass
-
- def _process_announcement(self, ann):
+ # this might raise UnknownKeyError or bad-sig error
+ ann, key_s = unsign_from_foolscap(ann_t)
+ # key is "v0-base32abc123"
+ except BadSignatureError:
+ self.log("bad signature on inbound announcement: %s" % (ann_t,),
+ parent=lp, level=log.WEIRD, umid="ZAU15Q")
+ # process other announcements that arrived with the bad one
+ continue
+
+ self._process_announcement(ann, key_s)
+
+ def _process_announcement(self, ann, key_s):
self._debug_counts["inbound_announcement"] += 1
- (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
+ service_name = str(ann["service-name"])
if service_name not in self._subscribed_service_names:
self.log("announcement for a service we don't care about [%s]"
% (service_name,), level=log.UNUSUAL, umid="dIpGNA")
self._debug_counts["wrong_service"] += 1
return
- self.log("announcement for [%s]: %s" % (service_name, ann),
- umid="BoKEag")
- assert type(furl) is str
- assert type(service_name) is str
- assert type(ri_name) is str
- assert type(nickname_utf8) is str
- nickname = nickname_utf8.decode("utf-8")
- assert type(nickname) is unicode
- assert type(ver) is str
- assert type(oldest) is str
-
- nodeid = b32decode(SturdyRef(furl).tubID.upper())
- nodeid_s = idlib.shortnodeid_b2a(nodeid)
-
- ann_d = { "version": 0,
- "service-name": service_name,
-
- "FURL": furl,
- "nickname": nickname,
- "app-versions": {}, # need #466 and v2 introducer
- "my-version": ver,
- "oldest-supported": oldest,
- }
-
- index = (service_name, nodeid)
- if self._current_announcements.get(index, None) == ann_d:
- self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
- service=service_name, nodeid=nodeid_s,
- level=log.UNUSUAL, umid="B1MIdA")
+ # for ASCII values, simplejson might give us unicode *or* bytes
+ if "nickname" in ann and isinstance(ann["nickname"], str):
+ ann["nickname"] = unicode(ann["nickname"])
+ nick_s = ann.get("nickname",u"").encode("utf-8")
+ lp2 = self.log(format="announcement for nickname '%(nick)s', service=%(svc)s: %(ann)s",
+ nick=nick_s, svc=service_name, ann=ann, umid="BoKEag")
+
+ # how do we describe this node in the logs?
+ desc_bits = []
+ if key_s:
+ desc_bits.append("serverid=" + key_s[:20])
+ if "anonymous-storage-FURL" in ann:
+ tubid_s = get_tubid_string_from_ann(ann)
+ desc_bits.append("tubid=" + tubid_s[:8])
+ description = "/".join(desc_bits)
+
+ # the index is used to track duplicates
+ index = make_index(ann, key_s)
+
+ # is this announcement a duplicate?
+ if (index in self._current_announcements
+ and self._current_announcements[index][0] == ann):
+ self.log(format="reannouncement for [%(service)s]:%(description)s, ignoring",
+ service=service_name, description=description,
+ parent=lp2, level=log.UNUSUAL, umid="B1MIdA")
self._debug_counts["duplicate_announcement"] += 1
return
+ # does it update an existing one?
if index in self._current_announcements:
self._debug_counts["update"] += 1
+ self.log("replacing old announcement: %s" % (ann,),
+ parent=lp2, level=log.NOISY, umid="wxwgIQ")
else:
self._debug_counts["new_announcement"] += 1
+ self.log("new announcement[%s]" % service_name,
+ parent=lp2, level=log.NOISY)
- self._current_announcements[index] = ann_d
+ self._current_announcements[index] = (ann, key_s, time.time())
# note: we never forget an index, but we might update its value
for (service_name2,cb,args,kwargs) in self._local_subscribers:
if service_name2 == service_name:
- eventually(cb, nodeid, ann_d, *args, **kwargs)
+ eventually(cb, key_s, ann, *args, **kwargs)
def remote_set_encoding_parameters(self, parameters):
self.encoding_parameters = parameters
--- /dev/null
+
+import re, simplejson
+from allmydata.util import keyutil, base32
+
+def make_index(ann, key_s):
+ """Return something that can be used as an index (e.g. a tuple of
+ strings), such that two messages that refer to the same 'thing' will have
+ the same index. This is a tuple of (service-name, signing-key, None) for
+ signed announcements, or (service-name, None, tubid) for unsigned
+ announcements."""
+
+ service_name = str(ann["service-name"])
+ if key_s:
+ return (service_name, key_s, None)
+ else:
+ tubid = get_tubid_string_from_ann(ann)
+ return (service_name, None, tubid)
+
+def get_tubid_string_from_ann(ann):
+ return get_tubid_string(str(ann.get("anonymous-storage-FURL")
+ or ann.get("FURL")))
+
+def get_tubid_string(furl):
+ m = re.match(r'pb://(\w+)@', furl)
+ assert m
+ return m.group(1).lower()
+
+def convert_announcement_v1_to_v2(ann_t):
+ (furl, service_name, ri_name, nickname, ver, oldest) = ann_t
+ assert type(furl) is str
+ assert type(service_name) is str
+ # ignore ri_name
+ assert type(nickname) is str
+ assert type(ver) is str
+ assert type(oldest) is str
+ ann = {"version": 0,
+ "nickname": nickname.decode("utf-8"),
+ "app-versions": {},
+ "my-version": ver,
+ "oldest-supported": oldest,
+
+ "service-name": service_name,
+ "anonymous-storage-FURL": furl,
+ "permutation-seed-base32": get_tubid_string(furl),
+ }
+ msg = simplejson.dumps(ann).encode("utf-8")
+ return (msg, None, None)
+
+def convert_announcement_v2_to_v1(ann_v2):
+ (msg, sig, pubkey) = ann_v2
+ ann = simplejson.loads(msg)
+ assert ann["version"] == 0
+ ann_t = (str(ann["anonymous-storage-FURL"]),
+ str(ann["service-name"]),
+ "remoteinterface-name is unused",
+ ann["nickname"].encode("utf-8"),
+ str(ann["my-version"]),
+ str(ann["oldest-supported"]),
+ )
+ return ann_t
+
+
+def sign_to_foolscap(ann, sk):
+ # return (bytes, None, None) or (bytes, sig-str, pubkey-str). A future
+ # HTTP-based serialization will use JSON({msg:b64(JSON(msg).utf8),
+ # sig:v0-b64(sig), pubkey:v0-b64(pubkey)}) .
+ msg = simplejson.dumps(ann).encode("utf-8")
+ if sk:
+ sig = "v0-"+base32.b2a(sk.sign(msg))
+ vk_bytes = sk.get_verifying_key_bytes()
+ ann_t = (msg, sig, "v0-"+base32.b2a(vk_bytes))
+ else:
+ ann_t = (msg, None, None)
+ return ann_t
+
+class UnknownKeyError(Exception):
+ pass
+
+def unsign_from_foolscap(ann_t):
+ (msg, sig_vs, claimed_key_vs) = ann_t
+ key_vs = None
+ if sig_vs and claimed_key_vs:
+ if not sig_vs.startswith("v0-"):
+ raise UnknownKeyError("only v0- signatures recognized")
+ if not claimed_key_vs.startswith("v0-"):
+ raise UnknownKeyError("only v0- keys recognized")
+ claimed_key = keyutil.parse_pubkey("pub-"+claimed_key_vs)
+ sig_bytes = base32.a2b(keyutil.remove_prefix(sig_vs, "v0-"))
+ claimed_key.verify(sig_bytes, msg)
+ key_vs = claimed_key_vs
+ ann = simplejson.loads(msg.decode("utf-8"))
+ return (ann, key_vs)
from zope.interface import Interface
from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \
- RemoteInterface
+ RemoteInterface, Referenceable
+from old import RIIntroducerSubscriberClient_v1
FURL = StringConstraint(1000)
+# old introducer protocol (v1):
+#
# Announcements are (FURL, service_name, remoteinterface_name,
# nickname, my_version, oldest_supported)
# the (FURL, service_name, remoteinterface_name) refer to the service being
# incompatible peer. The second goal is to enable the development of
# backwards-compatibility code.
-Announcement = TupleOf(FURL, str, str,
- str, str, str)
+Announcement_v1 = TupleOf(FURL, str, str,
+ str, str, str)
-class RIIntroducerSubscriberClient(RemoteInterface):
- __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com"
+# v2 protocol over foolscap: Announcements are 3-tuples of (bytes, str, str)
+# or (bytes, none, none)
+Announcement_v2 = Any()
- def announce(announcements=SetOf(Announcement)):
+class RIIntroducerSubscriberClient_v2(RemoteInterface):
+ __remote_name__ = "RIIntroducerSubscriberClient_v2.tahoe.allmydata.com"
+
+ def announce_v2(announcements=SetOf(Announcement_v2)):
"""I accept announcements from the publisher."""
return None
"""
return None
-# When Foolscap can handle multiple interfaces (Foolscap#17), the
-# full-powered introducer will implement both RIIntroducerPublisher and
-# RIIntroducerSubscriberService. Until then, we define
-# RIIntroducerPublisherAndSubscriberService as a combination of the two, and
-# make everybody use that.
+SubscriberInfo = DictOf(str, Any())
-class RIIntroducerPublisher(RemoteInterface):
+class RIIntroducerPublisherAndSubscriberService_v2(RemoteInterface):
"""To publish a service to the world, connect to me and give me your
- announcement message. I will deliver a copy to all connected subscribers."""
- __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com"
-
- def publish(announcement=Announcement):
- # canary?
- return None
-
-class RIIntroducerSubscriberService(RemoteInterface):
- __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com"
-
- def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str):
- """Give me a subscriber reference, and I will call its new_peers()
- method will any announcements that match the desired service name. I
- will ignore duplicate subscriptions.
- """
- return None
-
-class RIIntroducerPublisherAndSubscriberService(RemoteInterface):
- __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com"
+ announcement message. I will deliver a copy to all connected subscribers.
+ To hear about services, connect to me and subscribe to a specific
+ service_name."""
+ __remote_name__ = "RIIntroducerPublisherAndSubscriberService_v2.tahoe.allmydata.com"
def get_version():
return DictOf(str, Any())
- def publish(announcement=Announcement):
+ def publish(announcement=Announcement_v1):
+ return None
+ def publish_v2(announcement=Announcement_v2, canary=Referenceable):
return None
- def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str):
+ def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
+ return None
+ def subscribe_v2(subscriber=RIIntroducerSubscriberClient_v2,
+ service_name=str, subscriber_info=SubscriberInfo):
+ """Give me a subscriber reference, and I will call its announce_v2()
+ method with any announcements that match the desired service name. I
+ will ignore duplicate subscriptions. The subscriber_info dictionary
+ tells me about the subscriber, and is used for diagnostic/status
+ displays."""
return None
class IIntroducerClient(Interface):
publish their services to the rest of the world, and I help them learn
about services available on other nodes."""
- def publish(furl, service_name, remoteinterface_name):
- """Once you call this, I will tell the world that the Referenceable
- available at FURL is available to provide a service named
- SERVICE_NAME. The precise definition of the service being provided is
- identified by the Foolscap 'remote interface name' in the last
- parameter: this is supposed to be a globally-unique string that
- identifies the RemoteInterface that is implemented."""
+ def publish(service_name, ann, signing_key=None):
+ """Publish the given announcement dictionary (which must be
+ JSON-serializable), plus some additional keys, to the world.
+
+ Each announcement is characterized by a (service_name, serverid)
+ pair. When the server sees two announcements with the same pair, the
+ later one will replace the earlier one. The serverid is derived from
+ the signing_key, if present, otherwise it is derived from the
+ 'anonymous-storage-FURL' key.
+
+ If signing_key= is set to an instance of SigningKey, it will be
+ used to sign the announcement."""
def subscribe_to(service_name, callback, *args, **kwargs):
"""Call this if you will eventually want to use services with the
given SERVICE_NAME. This will prompt me to subscribe to announcements
of those services. Your callback will be invoked with at least two
- arguments: a serverid (binary string), and an announcement
- dictionary, followed by any additional callback args/kwargs you give
- me. I will run your callback for both new announcements and for
+ arguments: a pubkey and an announcement dictionary, followed by any
+ additional callback args/kwargs you gave me. The pubkey will be None
+ unless the announcement was signed by the corresponding pubkey, in
+ which case it will be a printable string like 'v0-base32..'.
+
+ I will run your callback for both new announcements and for
announcements that have changed, but you must be prepared to tolerate
duplicates.
- The announcement dictionary that I give you will have the following
- keys:
+ The announcement that I give you comes from some other client. It
+ will be a JSON-serializable dictionary which (by convention) is
+ expected to have at least the following keys:
version: 0
- service-name: str('storage')
-
- FURL: str(furl)
- remoteinterface-name: str(ri_name)
nickname: unicode
app-versions: {}
my-version: str
oldest-supported: str
- Note that app-version will be an empty dictionary until #466 is done
- and both the introducer and the remote client have been upgraded. For
- current (native) server types, the serverid will always be equal to
- the binary form of the FURL's tubid.
+ service-name: str('storage')
+ anonymous-storage-FURL: str(furl)
+
+ Note that app-version will be an empty dictionary if either the
+ publishing client or the Introducer are running older code.
"""
def connected_to_introducer():
--- /dev/null
+
+import time
+from base64 import b32decode
+from zope.interface import implements, Interface
+from twisted.application import service
+import allmydata
+from allmydata.interfaces import InsufficientVersionError
+from allmydata.util import log, idlib, rrefutil
+from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \
+ RemoteInterface, Referenceable, eventually, SturdyRef
+FURL = StringConstraint(1000)
+
+# We keep a copy of the old introducer (both client and server) here to
+# support compatibility tests. The old client is supposed to handle the new
+# server, and new client is supposed to handle the old server.
+
+
+# Announcements are (FURL, service_name, remoteinterface_name,
+# nickname, my_version, oldest_supported)
+# the (FURL, service_name, remoteinterface_name) refer to the service being
+# announced. The (nickname, my_version, oldest_supported) refer to the
+# client as a whole. The my_version/oldest_supported strings can be parsed
+# by an allmydata.util.version.Version instance, and then compared. The
+# first goal is to make sure that nodes are not confused by speaking to an
+# incompatible peer. The second goal is to enable the development of
+# backwards-compatibility code.
+
+Announcement = TupleOf(FURL, str, str,
+ str, str, str)
+
+class RIIntroducerSubscriberClient_v1(RemoteInterface):
+ __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com"
+
+ def announce(announcements=SetOf(Announcement)):
+ """I accept announcements from the publisher."""
+ return None
+
+ def set_encoding_parameters(parameters=(int, int, int)):
+ """Advise the client of the recommended k-of-n encoding parameters
+ for this grid. 'parameters' is a tuple of (k, desired, n), where 'n'
+ is the total number of shares that will be created for any given
+ file, while 'k' is the number of shares that must be retrieved to
+ recover that file, and 'desired' is the minimum number of shares that
+ must be placed before the uploader will consider its job a success.
+ n/k is the expansion ratio, while k determines the robustness.
+
+ Introducers should specify 'n' according to the expected size of the
+ grid (there is no point to producing more shares than there are
+ peers), and k according to the desired reliability-vs-overhead goals.
+
+ Note that setting k=1 is equivalent to simple replication.
+ """
+ return None
+
+# When Foolscap can handle multiple interfaces (Foolscap#17), the
+# full-powered introducer will implement both RIIntroducerPublisher and
+# RIIntroducerSubscriberService. Until then, we define
+# RIIntroducerPublisherAndSubscriberService as a combination of the two, and
+# make everybody use that.
+
+class RIIntroducerPublisher_v1(RemoteInterface):
+ """To publish a service to the world, connect to me and give me your
+ announcement message. I will deliver a copy to all connected subscribers."""
+ __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com"
+
+ def publish(announcement=Announcement):
+ # canary?
+ return None
+
+class RIIntroducerSubscriberService_v1(RemoteInterface):
+ __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com"
+
+ def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
+ """Give me a subscriber reference, and I will call its new_peers()
+ method will any announcements that match the desired service name. I
+ will ignore duplicate subscriptions.
+ """
+ return None
+
+class RIIntroducerPublisherAndSubscriberService_v1(RemoteInterface):
+ __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com"
+ def get_version():
+ return DictOf(str, Any())
+ def publish(announcement=Announcement):
+ return None
+ def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
+ return None
+
+class IIntroducerClient(Interface):
+ """I provide service introduction facilities for a node. I help nodes
+ publish their services to the rest of the world, and I help them learn
+ about services available on other nodes."""
+
+ def publish(furl, service_name, remoteinterface_name):
+ """Once you call this, I will tell the world that the Referenceable
+ available at FURL is available to provide a service named
+ SERVICE_NAME. The precise definition of the service being provided is
+ identified by the Foolscap 'remote interface name' in the last
+ parameter: this is supposed to be a globally-unique string that
+ identifies the RemoteInterface that is implemented."""
+
+ def subscribe_to(service_name, callback, *args, **kwargs):
+ """Call this if you will eventually want to use services with the
+ given SERVICE_NAME. This will prompt me to subscribe to announcements
+ of those services. Your callback will be invoked with at least two
+ arguments: a serverid (binary string), and an announcement
+ dictionary, followed by any additional callback args/kwargs you give
+ me. I will run your callback for both new announcements and for
+ announcements that have changed, but you must be prepared to tolerate
+ duplicates.
+
+ The announcement dictionary that I give you will have the following
+ keys:
+
+ version: 0
+ service-name: str('storage')
+
+ FURL: str(furl)
+ remoteinterface-name: str(ri_name)
+ nickname: unicode
+ app-versions: {}
+ my-version: str
+ oldest-supported: str
+
+ Note that app-version will be an empty dictionary until #466 is done
+ and both the introducer and the remote client have been upgraded. For
+ current (native) server types, the serverid will always be equal to
+ the binary form of the FURL's tubid.
+ """
+
+ def connected_to_introducer():
+ """Returns a boolean, True if we are currently connected to the
+ introducer, False if not."""
+
+
+class IntroducerClient_v1(service.Service, Referenceable):
+ implements(RIIntroducerSubscriberClient_v1, IIntroducerClient)
+
+ def __init__(self, tub, introducer_furl,
+ nickname, my_version, oldest_supported):
+ self._tub = tub
+ self.introducer_furl = introducer_furl
+
+ assert type(nickname) is unicode
+ self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
+ self._my_version = my_version
+ self._oldest_supported = oldest_supported
+
+ self._published_announcements = set()
+
+ self._publisher = None
+
+ self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
+ self._subscribed_service_names = set()
+ self._subscriptions = set() # requests we've actually sent
+
+ # _current_announcements remembers one announcement per
+ # (servicename,serverid) pair. Anything that arrives with the same
+ # pair will displace the previous one. This stores unpacked
+ # announcement dictionaries, which can be compared for equality to
+ # distinguish re-announcement from updates. It also provides memory
+ # for clients who subscribe after startup.
+ self._current_announcements = {}
+
+ self.encoding_parameters = None
+
+ # hooks for unit tests
+ self._debug_counts = {
+ "inbound_message": 0,
+ "inbound_announcement": 0,
+ "wrong_service": 0,
+ "duplicate_announcement": 0,
+ "update": 0,
+ "new_announcement": 0,
+ "outbound_message": 0,
+ }
+ self._debug_outstanding = 0
+
+ def _debug_retired(self, res):
+ self._debug_outstanding -= 1
+ return res
+
+ def startService(self):
+ service.Service.startService(self)
+ self._introducer_error = None
+ rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
+ self._introducer_reconnector = rc
+ def connect_failed(failure):
+ self.log("Initial Introducer connection failed: perhaps it's down",
+ level=log.WEIRD, failure=failure, umid="c5MqUQ")
+ d = self._tub.getReference(self.introducer_furl)
+ d.addErrback(connect_failed)
+
+ def _got_introducer(self, publisher):
+ self.log("connected to introducer, getting versions")
+ default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
+ { },
+ "application-version": "unknown: no get_version()",
+ }
+ d = rrefutil.add_version_to_remote_reference(publisher, default)
+ d.addCallback(self._got_versioned_introducer)
+ d.addErrback(self._got_error)
+
+ def _got_error(self, f):
+ # TODO: for the introducer, perhaps this should halt the application
+ self._introducer_error = f # polled by tests
+
+ def _got_versioned_introducer(self, publisher):
+ self.log("got introducer version: %s" % (publisher.version,))
+ # we require a V1 introducer
+ needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
+ if needed not in publisher.version:
+ raise InsufficientVersionError(needed, publisher.version)
+ self._publisher = publisher
+ publisher.notifyOnDisconnect(self._disconnected)
+ self._maybe_publish()
+ self._maybe_subscribe()
+
+ def _disconnected(self):
+ self.log("bummer, we've lost our connection to the introducer")
+ self._publisher = None
+ self._subscriptions.clear()
+
+ def log(self, *args, **kwargs):
+ if "facility" not in kwargs:
+ kwargs["facility"] = "tahoe.introducer"
+ return log.msg(*args, **kwargs)
+
+
+ def publish(self, furl, service_name, remoteinterface_name):
+ assert type(self._nickname_utf8) is str # we always send UTF-8
+ ann = (furl, service_name, remoteinterface_name,
+ self._nickname_utf8, self._my_version, self._oldest_supported)
+ self._published_announcements.add(ann)
+ self._maybe_publish()
+
+ def subscribe_to(self, service_name, cb, *args, **kwargs):
+ self._local_subscribers.append( (service_name,cb,args,kwargs) )
+ self._subscribed_service_names.add(service_name)
+ self._maybe_subscribe()
+ for (servicename,nodeid),ann_d in self._current_announcements.items():
+ if servicename == service_name:
+ eventually(cb, nodeid, ann_d)
+
+ def _maybe_subscribe(self):
+ if not self._publisher:
+ self.log("want to subscribe, but no introducer yet",
+ level=log.NOISY)
+ return
+ for service_name in self._subscribed_service_names:
+ if service_name not in self._subscriptions:
+ # there is a race here, but the subscription desk ignores
+ # duplicate requests.
+ self._subscriptions.add(service_name)
+ self._debug_outstanding += 1
+ d = self._publisher.callRemote("subscribe", self, service_name)
+ d.addBoth(self._debug_retired)
+ d.addErrback(rrefutil.trap_deadref)
+ d.addErrback(log.err, format="server errored during subscribe",
+ facility="tahoe.introducer",
+ level=log.WEIRD, umid="2uMScQ")
+
+ def _maybe_publish(self):
+ if not self._publisher:
+ self.log("want to publish, but no introducer yet", level=log.NOISY)
+ return
+ # this re-publishes everything. The Introducer ignores duplicates
+ for ann in self._published_announcements:
+ self._debug_counts["outbound_message"] += 1
+ self._debug_outstanding += 1
+ d = self._publisher.callRemote("publish", ann)
+ d.addBoth(self._debug_retired)
+ d.addErrback(rrefutil.trap_deadref)
+ d.addErrback(log.err,
+ format="server errored during publish %(ann)s",
+ ann=ann, facility="tahoe.introducer",
+ level=log.WEIRD, umid="xs9pVQ")
+
+
+
+ def remote_announce(self, announcements):
+ self.log("received %d announcements" % len(announcements))
+ self._debug_counts["inbound_message"] += 1
+ for ann in announcements:
+ try:
+ self._process_announcement(ann)
+ except:
+ log.err(format="unable to process announcement %(ann)s",
+ ann=ann)
+ # Don't let a corrupt announcement prevent us from processing
+ # the remaining ones. Don't return an error to the server,
+ # since they'd just ignore it anyways.
+ pass
+
+ def _process_announcement(self, ann):
+ self._debug_counts["inbound_announcement"] += 1
+ (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
+ if service_name not in self._subscribed_service_names:
+ self.log("announcement for a service we don't care about [%s]"
+ % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
+ self._debug_counts["wrong_service"] += 1
+ return
+ self.log("announcement for [%s]: %s" % (service_name, ann),
+ umid="BoKEag")
+ assert type(furl) is str
+ assert type(service_name) is str
+ assert type(ri_name) is str
+ assert type(nickname_utf8) is str
+ nickname = nickname_utf8.decode("utf-8")
+ assert type(nickname) is unicode
+ assert type(ver) is str
+ assert type(oldest) is str
+
+ nodeid = b32decode(SturdyRef(furl).tubID.upper())
+ nodeid_s = idlib.shortnodeid_b2a(nodeid)
+
+ ann_d = { "version": 0,
+ "service-name": service_name,
+
+ "FURL": furl,
+ "nickname": nickname,
+ "app-versions": {}, # need #466 and v2 introducer
+ "my-version": ver,
+ "oldest-supported": oldest,
+ }
+
+ index = (service_name, nodeid)
+ if self._current_announcements.get(index, None) == ann_d:
+ self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
+ service=service_name, nodeid=nodeid_s,
+ level=log.UNUSUAL, umid="B1MIdA")
+ self._debug_counts["duplicate_announcement"] += 1
+ return
+ if index in self._current_announcements:
+ self._debug_counts["update"] += 1
+ else:
+ self._debug_counts["new_announcement"] += 1
+
+ self._current_announcements[index] = ann_d
+ # note: we never forget an index, but we might update its value
+
+ for (service_name2,cb,args,kwargs) in self._local_subscribers:
+ if service_name2 == service_name:
+ eventually(cb, nodeid, ann_d, *args, **kwargs)
+
+ def remote_set_encoding_parameters(self, parameters):
+ self.encoding_parameters = parameters
+
+ def connected_to_introducer(self):
+ return bool(self._publisher)
+
+class IntroducerService_v1(service.MultiService, Referenceable):
+ implements(RIIntroducerPublisherAndSubscriberService_v1)
+ name = "introducer"
+ VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1":
+ { },
+ "application-version": str(allmydata.__full_version__),
+ }
+
+ def __init__(self, basedir="."):
+ service.MultiService.__init__(self)
+ self.introducer_url = None
+ # 'index' is (service_name, tubid)
+ self._announcements = {} # dict of index -> (announcement, timestamp)
+ self._subscribers = {} # dict of (rref->timestamp) dicts
+ self._debug_counts = {"inbound_message": 0,
+ "inbound_duplicate": 0,
+ "inbound_update": 0,
+ "outbound_message": 0,
+ "outbound_announcements": 0,
+ "inbound_subscribe": 0}
+ self._debug_outstanding = 0
+
+ def _debug_retired(self, res):
+ self._debug_outstanding -= 1
+ return res
+
+ def log(self, *args, **kwargs):
+ if "facility" not in kwargs:
+ kwargs["facility"] = "tahoe.introducer"
+ return log.msg(*args, **kwargs)
+
+ def get_announcements(self):
+ return self._announcements
+ def get_subscribers(self):
+ return self._subscribers
+
+ def remote_get_version(self):
+ return self.VERSION
+
+ def remote_publish(self, announcement):
+ try:
+ self._publish(announcement)
+ except:
+ log.err(format="Introducer.remote_publish failed on %(ann)s",
+ ann=announcement, level=log.UNUSUAL, umid="620rWA")
+ raise
+
+ def _publish(self, announcement):
+ self._debug_counts["inbound_message"] += 1
+ self.log("introducer: announcement published: %s" % (announcement,) )
+ (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
+ #print "PUB", service_name, nickname_utf8
+
+ nodeid = b32decode(SturdyRef(furl).tubID.upper())
+ index = (service_name, nodeid)
+
+ if index in self._announcements:
+ (old_announcement, timestamp) = self._announcements[index]
+ if old_announcement == announcement:
+ self.log("but we already knew it, ignoring", level=log.NOISY)
+ self._debug_counts["inbound_duplicate"] += 1
+ return
+ else:
+ self.log("old announcement being updated", level=log.NOISY)
+ self._debug_counts["inbound_update"] += 1
+ self._announcements[index] = (announcement, time.time())
+
+ for s in self._subscribers.get(service_name, []):
+ self._debug_counts["outbound_message"] += 1
+ self._debug_counts["outbound_announcements"] += 1
+ self._debug_outstanding += 1
+ d = s.callRemote("announce", set([announcement]))
+ d.addBoth(self._debug_retired)
+ d.addErrback(rrefutil.trap_deadref)
+ d.addErrback(log.err,
+ format="subscriber errored on announcement %(ann)s",
+ ann=announcement, facility="tahoe.introducer",
+ level=log.UNUSUAL, umid="jfGMXQ")
+
+ def remote_subscribe(self, subscriber, service_name):
+ self.log("introducer: subscription[%s] request at %s" % (service_name,
+ subscriber))
+ self._debug_counts["inbound_subscribe"] += 1
+ if service_name not in self._subscribers:
+ self._subscribers[service_name] = {}
+ subscribers = self._subscribers[service_name]
+ if subscriber in subscribers:
+ self.log("but they're already subscribed, ignoring",
+ level=log.UNUSUAL)
+ return
+ subscribers[subscriber] = time.time()
+ def _remove():
+ self.log("introducer: unsubscribing[%s] %s" % (service_name,
+ subscriber))
+ subscribers.pop(subscriber, None)
+ subscriber.notifyOnDisconnect(_remove)
+
+ announcements = set(
+ [ ann
+ for (sn2,nodeid),(ann,when) in self._announcements.items()
+ if sn2 == service_name] )
+
+ self._debug_counts["outbound_message"] += 1
+ self._debug_counts["outbound_announcements"] += len(announcements)
+ self._debug_outstanding += 1
+ d = subscriber.callRemote("announce", announcements)
+ d.addBoth(self._debug_retired)
+ d.addErrback(rrefutil.trap_deadref)
+ d.addErrback(log.err,
+ format="subscriber errored during subscribe %(anns)s",
+ anns=announcements, facility="tahoe.introducer",
+ level=log.UNUSUAL, umid="1XChxA")
import time, os.path
-from base64 import b32decode
from zope.interface import implements
from twisted.application import service
-from foolscap.api import Referenceable, SturdyRef
+from foolscap.api import Referenceable
import allmydata
from allmydata import node
-from allmydata.util import log, rrefutil
+from allmydata.util import log
from allmydata.introducer.interfaces import \
- RIIntroducerPublisherAndSubscriberService
+ RIIntroducerPublisherAndSubscriberService_v2
+from allmydata.introducer.common import convert_announcement_v1_to_v2, \
+ convert_announcement_v2_to_v1, unsign_from_foolscap, make_index, \
+ get_tubid_string_from_ann
class IntroducerNode(node.Node):
PORTNUMFILE = "introducer.port"
def _publish(res):
self.introducer_url = self.tub.registerReference(introducerservice,
"introducer")
- self.log(" introducer is at %s" % self.introducer_url)
+ self.log(" introducer is at %s" % self.introducer_url,
+ umid="qF2L9A")
self.write_config("introducer.furl", self.introducer_url + "\n")
d.addCallback(_publish)
d.addErrback(log.err, facility="tahoe.init",
level=log.BAD, umid="UaNs9A")
def init_web(self, webport):
- self.log("init_web(webport=%s)", args=(webport,))
+ self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA")
from allmydata.webish import IntroducerWebishServer
nodeurl_path = os.path.join(self.basedir, "node.url")
ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir)
self.add_service(ws)
+class WrapV1SubscriberInV2Interface: # for_v1
+ """I wrap a RemoteReference that points at an old v1 subscriber, enabling
+ it to be treated like a v2 subscriber.
+ """
+
+ def __init__(self, original):
+ self.original = original
+ def __eq__(self, them):
+ return self.original == them
+ def __ne__(self, them):
+ return self.original != them
+ def __hash__(self):
+ return hash(self.original)
+ def getRemoteTubID(self):
+ return self.original.getRemoteTubID()
+ def getSturdyRef(self):
+ return self.original.getSturdyRef()
+ def getPeer(self):
+ return self.original.getPeer()
+ def callRemote(self, methname, *args, **kwargs):
+ m = getattr(self, "wrap_" + methname)
+ return m(*args, **kwargs)
+ def wrap_announce_v2(self, announcements):
+ anns_v1 = [convert_announcement_v2_to_v1(ann) for ann in announcements]
+ return self.original.callRemote("announce", set(anns_v1))
+ def wrap_set_encoding_parameters(self, parameters):
+ # note: unused
+ return self.original.callRemote("set_encoding_parameters", parameters)
+ def notifyOnDisconnect(self, *args, **kwargs):
+ return self.original.notifyOnDisconnect(*args, **kwargs)
+
class IntroducerService(service.MultiService, Referenceable):
- implements(RIIntroducerPublisherAndSubscriberService)
+ implements(RIIntroducerPublisherAndSubscriberService_v2)
name = "introducer"
- VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1":
- { },
+ # v1 is the original protocol, supported since 1.0 (but only advertised
+ # starting in 1.3). v2 is the new signed protocol, supported after 1.9
+ VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": { },
+ "http://allmydata.org/tahoe/protocols/introducer/v2": { },
"application-version": str(allmydata.__full_version__),
}
def __init__(self, basedir="."):
service.MultiService.__init__(self)
self.introducer_url = None
- # 'index' is (service_name, tubid)
- self._announcements = {} # dict of index -> (announcement, timestamp)
- self._subscribers = {} # dict of (rref->timestamp) dicts
+ # 'index' is (service_name, key_s, tubid), where key_s or tubid is
+ # None
+ self._announcements = {} # dict of index ->
+ # (ann_t, canary, ann, timestamp)
+
+ # ann (the announcement dictionary) is cleaned up: nickname is always
+ # unicode, servicename is always ascii, etc, even though
+ # simplejson.loads sometimes returns either
+
+ # self._subscribers is a dict mapping servicename to subscriptions
+ # 'subscriptions' is a dict mapping rref to a subscription
+ # 'subscription' is a tuple of (subscriber_info, timestamp)
+ # 'subscriber_info' is a dict, provided directly for v2 clients, or
+ # synthesized for v1 clients. The expected keys are:
+ # version, nickname, app-versions, my-version, oldest-supported
+ self._subscribers = {}
+
+ # self._stub_client_announcements contains the information provided
+ # by v1 clients. We stash this so we can match it up with their
+ # subscriptions.
+ self._stub_client_announcements = {} # maps tubid to sinfo # for_v1
+
self._debug_counts = {"inbound_message": 0,
"inbound_duplicate": 0,
"inbound_update": 0,
"outbound_message": 0,
"outbound_announcements": 0,
"inbound_subscribe": 0}
+ self._debug_outstanding = 0 # also covers WrapV1SubscriberInV2Interface
+
+ def _debug_retired(self, res):
+ self._debug_outstanding -= 1
+ return res
def log(self, *args, **kwargs):
if "facility" not in kwargs:
- kwargs["facility"] = "tahoe.introducer"
+ kwargs["facility"] = "tahoe.introducer.server"
return log.msg(*args, **kwargs)
def get_announcements(self):
return self._announcements
def get_subscribers(self):
- return self._subscribers
+ """Return a list of (service_name, when, subscriber_info, rref) for
+ all subscribers. subscriber_info is a dict with the following keys:
+ version, nickname, app-versions, my-version, oldest-supported"""
+ s = []
+ for service_name, subscriptions in self._subscribers.items():
+ for rref,(subscriber_info,when) in subscriptions.items():
+ s.append( (service_name, when, subscriber_info, rref) )
+ return s
def remote_get_version(self):
return self.VERSION
- def remote_publish(self, announcement):
+ def remote_publish(self, ann_t): # for_v1
+ lp = self.log("introducer: old (v1) announcement published: %s"
+ % (ann_t,), umid="6zGOIw")
+ ann_v2 = convert_announcement_v1_to_v2(ann_t)
+ return self.publish(ann_v2, None, lp)
+
+ def remote_publish_v2(self, ann_t, canary):
+ lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ")
+ return self.publish(ann_t, canary, lp)
+
+ def publish(self, ann_t, canary, lp):
try:
- self._publish(announcement)
+ self._publish(ann_t, canary, lp)
except:
log.err(format="Introducer.remote_publish failed on %(ann)s",
- ann=announcement, level=log.UNUSUAL, umid="620rWA")
+ ann=ann_t,
+ level=log.UNUSUAL, parent=lp, umid="620rWA")
raise
- def _publish(self, announcement):
+ def _publish(self, ann_t, canary, lp):
self._debug_counts["inbound_message"] += 1
- self.log("introducer: announcement published: %s" % (announcement,) )
- (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
+ self.log("introducer: announcement published: %s" % (ann_t,),
+ umid="wKHgCw")
+ ann, key = unsign_from_foolscap(ann_t) # might raise BadSignatureError
+ index = make_index(ann, key)
- nodeid = b32decode(SturdyRef(furl).tubID.upper())
- index = (service_name, nodeid)
+ service_name = str(ann["service-name"])
+ if service_name == "stub_client": # for_v1
+ self._attach_stub_client(ann, lp)
+ return
- if index in self._announcements:
- (old_announcement, timestamp) = self._announcements[index]
- if old_announcement == announcement:
- self.log("but we already knew it, ignoring", level=log.NOISY)
+ old = self._announcements.get(index)
+ if old:
+ (old_ann_t, canary, old_ann, timestamp) = old
+ if old_ann == ann:
+ self.log("but we already knew it, ignoring", level=log.NOISY,
+ umid="myxzLw")
self._debug_counts["inbound_duplicate"] += 1
return
else:
- self.log("old announcement being updated", level=log.NOISY)
+ self.log("old announcement being updated", level=log.NOISY,
+ umid="304r9g")
self._debug_counts["inbound_update"] += 1
- self._announcements[index] = (announcement, time.time())
+ self._announcements[index] = (ann_t, canary, ann, time.time())
+ #if canary:
+ # canary.notifyOnDisconnect ...
+ # use a CanaryWatcher? with cw.is_connected()?
+ # actually we just want foolscap to give rref.is_connected(), since
+ # this is only for the status display
for s in self._subscribers.get(service_name, []):
self._debug_counts["outbound_message"] += 1
self._debug_counts["outbound_announcements"] += 1
- d = s.callRemote("announce", set([announcement]))
- d.addErrback(rrefutil.trap_deadref)
+ self._debug_outstanding += 1
+ d = s.callRemote("announce_v2", set([ann_t]))
+ d.addBoth(self._debug_retired)
d.addErrback(log.err,
format="subscriber errored on announcement %(ann)s",
- ann=announcement, facility="tahoe.introducer",
+ ann=ann_t, facility="tahoe.introducer",
level=log.UNUSUAL, umid="jfGMXQ")
- def remote_subscribe(self, subscriber, service_name):
- self.log("introducer: subscription[%s] request at %s" % (service_name,
- subscriber))
+ def _attach_stub_client(self, ann, lp):
+ # There might be a v1 subscriber for whom this is a stub_client.
+ # We might have received the subscription before the stub_client
+ # announcement, in which case we now need to fix up the record in
+ # self._subscriptions .
+
+ # record it for later, in case the stub_client arrived before the
+ # subscription
+ subscriber_info = self._get_subscriber_info_from_ann(ann)
+ ann_tubid = get_tubid_string_from_ann(ann)
+ self._stub_client_announcements[ann_tubid] = subscriber_info
+
+ lp2 = self.log("stub_client announcement, "
+ "looking for matching subscriber",
+ parent=lp, level=log.NOISY, umid="BTywDg")
+
+ for sn in self._subscribers:
+ s = self._subscribers[sn]
+ for (subscriber, info) in s.items():
+ # we correlate these by looking for a subscriber whose tubid
+ # matches this announcement
+ sub_tubid = subscriber.getRemoteTubID()
+ if sub_tubid == ann_tubid:
+ self.log(format="found a match, nodeid=%(nodeid)s",
+ nodeid=sub_tubid,
+ level=log.NOISY, parent=lp2, umid="xsWs1A")
+ # found a match. Does it need info?
+ if not info[0]:
+ self.log(format="replacing info",
+ level=log.NOISY, parent=lp2, umid="m5kxwA")
+ # yup
+ s[subscriber] = (subscriber_info, info[1])
+ # and we don't remember or announce stub_clients beyond what we
+ # need to get the subscriber_info set up
+
+ def _get_subscriber_info_from_ann(self, ann): # for_v1
+ sinfo = { "version": ann["version"],
+ "nickname": ann["nickname"],
+ "app-versions": ann["app-versions"],
+ "my-version": ann["my-version"],
+ "oldest-supported": ann["oldest-supported"],
+ }
+ return sinfo
+
+ def remote_subscribe(self, subscriber, service_name): # for_v1
+ self.log("introducer: old (v1) subscription[%s] request at %s"
+ % (service_name, subscriber), umid="hJlGUg")
+ return self.add_subscriber(WrapV1SubscriberInV2Interface(subscriber),
+ service_name, None)
+
+ def remote_subscribe_v2(self, subscriber, service_name, subscriber_info):
+ self.log("introducer: subscription[%s] request at %s"
+ % (service_name, subscriber), umid="U3uzLg")
+ return self.add_subscriber(subscriber, service_name, subscriber_info)
+
+ def add_subscriber(self, subscriber, service_name, subscriber_info):
self._debug_counts["inbound_subscribe"] += 1
if service_name not in self._subscribers:
self._subscribers[service_name] = {}
subscribers = self._subscribers[service_name]
if subscriber in subscribers:
self.log("but they're already subscribed, ignoring",
- level=log.UNUSUAL)
+ level=log.UNUSUAL, umid="Sy9EfA")
return
- subscribers[subscriber] = time.time()
+
+ if not subscriber_info: # for_v1
+ # v1 clients don't provide subscriber_info, but they should
+ # publish a 'stub client' record which contains the same
+ # information. If we've already received this, it will be in
+ # self._stub_client_announcements
+ tubid = subscriber.getRemoteTubID()
+ if tubid in self._stub_client_announcements:
+ subscriber_info = self._stub_client_announcements[tubid]
+
+ subscribers[subscriber] = (subscriber_info, time.time())
def _remove():
self.log("introducer: unsubscribing[%s] %s" % (service_name,
- subscriber))
+ subscriber),
+ umid="vYGcJg")
subscribers.pop(subscriber, None)
subscriber.notifyOnDisconnect(_remove)
- announcements = set(
- [ ann
- for (sn2,nodeid),(ann,when) in self._announcements.items()
- if sn2 == service_name] )
-
- self._debug_counts["outbound_message"] += 1
- self._debug_counts["outbound_announcements"] += len(announcements)
- d = subscriber.callRemote("announce", announcements)
- d.addErrback(rrefutil.trap_deadref)
- d.addErrback(log.err,
- format="subscriber errored during subscribe %(anns)s",
- anns=announcements, facility="tahoe.introducer",
- level=log.UNUSUAL, umid="mtZepQ")
+ # now tell them about any announcements they're interested in
+ announcements = set( [ ann_t
+ for idx,(ann_t,canary,ann,when)
+ in self._announcements.items()
+ if idx[0] == service_name] )
+ if announcements:
+ self._debug_counts["outbound_message"] += 1
+ self._debug_counts["outbound_announcements"] += len(announcements)
+ self._debug_outstanding += 1
+ d = subscriber.callRemote("announce_v2", announcements)
+ d.addBoth(self._debug_retired)
+ d.addErrback(log.err,
+ format="subscriber errored during subscribe %(anns)s",
+ anns=announcements, facility="tahoe.introducer",
+ level=log.UNUSUAL, umid="mtZepQ")
+ return d
# TODO: merge this with allmydata.get_package_versions
return dict(app_versions.versions)
+ def get_config_from_file(self, name, required=False):
+ """Get the (string) contents of a config file, or None if the file
+ did not exist. If required=True, raise an exception rather than
+ returning None. Any leading or trailing whitespace will be stripped
+ from the data."""
+ fn = os.path.join(self.basedir, name)
+ try:
+ return fileutil.read(fn).strip()
+ except EnvironmentError:
+ if not required:
+ return None
+ raise
+
def write_private_config(self, name, value):
"""Write the (string) contents of a private config file (which is a
config file that resides within the subdirectory named 'private'), and
def __repr__(self):
return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
+ def have_shares(self):
+ # quick test to decide if we need to commit to an implicit
+ # permutation-seed or if we should use a new one
+ return bool(set(os.listdir(self.sharedir)) - set(["incoming"]))
+
def add_bucket_counter(self):
statefile = os.path.join(self.storedir, "bucket_counter.state")
self.bucket_counter = BucketCountingCrawler(self, statefile)
# 6: implement other sorts of IStorageClient classes: S3, etc
-import time
+import re, time
from zope.interface import implements, Interface
from foolscap.api import eventually
from allmydata.interfaces import IStorageBroker
-from allmydata.util import idlib, log
+from allmydata.util import log, base32
from allmydata.util.assertutil import precondition
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.util.hashutil import sha1
self.introducer_client = None
# these two are used in unit tests
- def test_add_rref(self, serverid, rref):
- s = NativeStorageServer(serverid, {})
+ def test_add_rref(self, serverid, rref, ann):
+ s = NativeStorageServer(serverid, ann.copy())
s.rref = rref
self.servers[serverid] = s
self.introducer_client = ic = introducer_client
ic.subscribe_to("storage", self._got_announcement)
- def _got_announcement(self, serverid, ann_d):
- precondition(isinstance(serverid, str), serverid)
- precondition(len(serverid) == 20, serverid)
- assert ann_d["service-name"] == "storage"
+ def _got_announcement(self, key_s, ann):
+ if key_s is not None:
+ precondition(isinstance(key_s, str), key_s)
+ precondition(key_s.startswith("v0-"), key_s)
+ assert ann["service-name"] == "storage"
+ s = NativeStorageServer(key_s, ann)
+ serverid = s.get_serverid()
old = self.servers.get(serverid)
if old:
- if old.get_announcement() == ann_d:
+ if old.get_announcement() == ann:
return # duplicate
# replacement
del self.servers[serverid]
old.stop_connecting()
# now we forget about them and start using the new one
- dsc = NativeStorageServer(serverid, ann_d)
- self.servers[serverid] = dsc
- dsc.start_connecting(self.tub, self._trigger_connections)
+ self.servers[serverid] = s
+ s.start_connecting(self.tub, self._trigger_connections)
# the descriptor will manage their own Reconnector, and each time we
# need servers, we'll ask them if they're connected or not.
"application-version": "unknown: no get_version()",
}
- def __init__(self, serverid, ann_d, min_shares=1):
- self.serverid = serverid
- self._tubid = serverid
- self.announcement = ann_d
+ def __init__(self, key_s, ann, min_shares=1):
+ self.key_s = key_s
+ self.announcement = ann
self.min_shares = min_shares
- self.serverid_s = idlib.shortnodeid_b2a(self.serverid)
+ assert "anonymous-storage-FURL" in ann, ann
+ furl = str(ann["anonymous-storage-FURL"])
+ m = re.match(r'pb://(\w+)@', furl)
+ assert m, furl
+ tubid_s = m.group(1).lower()
+ self._tubid = base32.a2b(tubid_s)
+ assert "permutation-seed-base32" in ann, ann
+ ps = base32.a2b(str(ann["permutation-seed-base32"]))
+ self._permutation_seed = ps
+
+ name = key_s or tubid_s
+ self._long_description = name
+ self._short_description = name[:8] # TODO: decide who adds []
+
self.announcement_time = time.time()
self.last_connect_time = None
self.last_loss_time = None
def __repr__(self):
return "<NativeStorageServer for %s>" % self.get_name()
def get_serverid(self):
- return self._tubid
+ return self._tubid # XXX replace with self.key_s
def get_permutation_seed(self):
- return self._tubid
+ return self._permutation_seed
def get_version(self):
if self.rref:
return self.rref.version
return None
def get_name(self): # keep methodname short
- return self.serverid_s
+ return self._short_description
def get_longname(self):
- return idlib.nodeid_b2a(self._tubid)
+ return self._long_description
def get_lease_seed(self):
return self._tubid
def get_foolscap_write_enabler_seed(self):
return self.announcement_time
def start_connecting(self, tub, trigger_cb):
- furl = self.announcement["FURL"]
+ furl = str(self.announcement["anonymous-storage-FURL"])
self._trigger_cb = trigger_cb
self._reconnector = tub.connectTo(furl, self._got_connection)
for (peerid, nickname) in [("\x00"*20, "peer-0"),
("\xff"*20, "peer-f"),
("\x11"*20, "peer-11")] :
- ann_d = { "version": 0,
- "service-name": "storage",
- "FURL": "fake furl",
- "nickname": unicode(nickname),
- "app-versions": {}, # need #466 and v2 introducer
- "my-version": "ver",
- "oldest-supported": "oldest",
- }
- s = NativeStorageServer(peerid, ann_d)
+ ann = { "version": 0,
+ "service-name": "storage",
+ "anonymous-storage-FURL": "pb://abcde@nowhere/fake",
+ "permutation-seed-base32": "",
+ "nickname": unicode(nickname),
+ "app-versions": {}, # need #466 and v2 introducer
+ "my-version": "ver",
+ "oldest-supported": "oldest",
+ }
+ s = NativeStorageServer(peerid, ann)
sb.test_add_server(peerid, s)
c = FakeClient()
c.storage_broker = sb
self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0)
def _permute(self, sb, key):
- return [ s.get_serverid() for s in sb.get_servers_for_psi(key) ]
+ return [ s.get_longname() for s in sb.get_servers_for_psi(key) ]
def test_permute(self):
sb = StorageFarmBroker(None, True)
for k in ["%d" % i for i in range(5)]:
- sb.test_add_rref(k, "rref")
+ ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake",
+ "permutation-seed-base32": base32.b2a(k) }
+ sb.test_add_rref(k, "rref", ann)
self.failUnlessReallyEqual(self._permute(sb, "one"), ['3','1','0','4','2'])
self.failUnlessReallyEqual(self._permute(sb, "two"), ['0','4','2','1','3'])
import os, re
from base64 import b32decode
+import simplejson
from twisted.trial import unittest
from twisted.internet import defer
from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
from twisted.application import service
from allmydata.interfaces import InsufficientVersionError
-from allmydata.introducer.client import IntroducerClient
+from allmydata.introducer.client import IntroducerClient, \
+ WrapV2ClientInV1Interface
from allmydata.introducer.server import IntroducerService
+from allmydata.introducer.common import get_tubid_string_from_ann, \
+ get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \
+ UnknownKeyError
+from allmydata.introducer import old
# test compatibility with old introducer .tac files
from allmydata.introducer import IntroducerNode
-from allmydata.util import pollmixin
+from allmydata.util import pollmixin, keyutil
import allmydata.test.common_util as testutil
class LoggingMultiService(service.MultiService):
def test_create(self):
ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
- "my_version", "oldest_version")
+ "my_version", "oldest_version", {})
self.failUnless(isinstance(ic, IntroducerClient))
def test_listen(self):
i = IntroducerService()
i.setServiceParent(self.parent)
- def test_duplicate(self):
+ def test_duplicate_publish(self):
i = IntroducerService()
self.failUnlessEqual(len(i.get_announcements()), 0)
self.failUnlessEqual(len(i.get_subscribers()), 0)
self.failUnlessEqual(len(i.get_announcements()), 2)
self.failUnlessEqual(len(i.get_subscribers()), 0)
+ def test_id_collision(self):
+ # test replacement case where tubid equals a keyid (one should
+ # not replace the other)
+ i = IntroducerService()
+ ic = IntroducerClient(None,
+ "introducer.furl", u"my_nickname",
+ "my_version", "oldest_version", {})
+ sk_s, vk_s = keyutil.make_keypair()
+ sk, _ignored = keyutil.parse_privkey(sk_s)
+ keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
+ furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
+ ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
+ i.remote_publish_v2(ann_t, Referenceable())
+ announcements = i.get_announcements()
+ self.failUnlessEqual(len(announcements), 1)
+ key1 = ("storage", "v0-"+keyid, None)
+ self.failUnless(key1 in announcements)
+ (ign, ign, ann1_out, ign) = announcements[key1]
+ self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
+
+ furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
+ ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
+ i.remote_publish(ann2)
+ self.failUnlessEqual(len(announcements), 2)
+ key2 = ("storage", None, keyid)
+ self.failUnless(key2 in announcements)
+ (ign, ign, ann2_out, ign) = announcements[key2]
+ self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2)
+
+
+def make_ann(furl):
+ ann = { "anonymous-storage-FURL": furl,
+ "permutation-seed-base32": get_tubid_string(furl) }
+ return ann
+
+def make_ann_t(ic, furl, privkey):
+ return ic.create_announcement("storage", make_ann(furl), privkey)
+
+class Client(unittest.TestCase):
+ def test_duplicate_receive_v1(self):
+ ic = IntroducerClient(None,
+ "introducer.furl", u"my_nickname",
+ "my_version", "oldest_version", {})
+ announcements = []
+ ic.subscribe_to("storage",
+ lambda key_s,ann: announcements.append(ann))
+ furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
+ ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
+ ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
+ ca = WrapV2ClientInV1Interface(ic)
+
+ ca.remote_announce([ann1])
+ d = fireEventually()
+ def _then(ign):
+ self.failUnlessEqual(len(announcements), 1)
+ self.failUnlessEqual(announcements[0]["nickname"], u"nick1")
+ self.failUnlessEqual(announcements[0]["my-version"], "ver23")
+ self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)
+ self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
+ self.failUnlessEqual(ic._debug_counts["update"], 0)
+ self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)
+ # now send a duplicate announcement: this should not notify clients
+ ca.remote_announce([ann1])
+ return fireEventually()
+ d.addCallback(_then)
+ def _then2(ign):
+ self.failUnlessEqual(len(announcements), 1)
+ self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)
+ self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
+ self.failUnlessEqual(ic._debug_counts["update"], 0)
+ self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
+ # and a replacement announcement: same FURL, new other stuff.
+ # Clients should be notified.
+ ca.remote_announce([ann1b])
+ return fireEventually()
+ d.addCallback(_then2)
+ def _then3(ign):
+ self.failUnlessEqual(len(announcements), 2)
+ self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)
+ self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
+ self.failUnlessEqual(ic._debug_counts["update"], 1)
+ self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
+ # test that the other stuff changed
+ self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")
+ self.failUnlessEqual(announcements[-1]["my-version"], "ver24")
+ d.addCallback(_then3)
+ return d
+
+ def test_duplicate_receive_v2(self):
+ ic1 = IntroducerClient(None,
+ "introducer.furl", u"my_nickname",
+ "ver23", "oldest_version", {})
+ # we use a second client just to create a different-looking
+ # announcement
+ ic2 = IntroducerClient(None,
+ "introducer.furl", u"my_nickname",
+ "ver24","oldest_version",{})
+ announcements = []
+ def _received(key_s, ann):
+ announcements.append( (key_s, ann) )
+ ic1.subscribe_to("storage", _received)
+ furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
+ furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp"
+ furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo"
+
+ privkey_s, pubkey_vs = keyutil.make_keypair()
+ privkey, _ignored = keyutil.parse_privkey(privkey_s)
+ pubkey_s = keyutil.remove_prefix(pubkey_vs, "pub-")
+
+ # ann1: ic1, furl1
+ # ann1a: ic1, furl1a (same SturdyRef, different connection hints)
+ # ann1b: ic2, furl1
+ # ann2: ic2, furl2
+
+ self.ann1 = make_ann_t(ic1, furl1, privkey)
+ self.ann1a = make_ann_t(ic1, furl1a, privkey)
+ self.ann1b = make_ann_t(ic2, furl1, privkey)
+ self.ann2 = make_ann_t(ic2, furl2, privkey)
+
+ ic1.remote_announce_v2([self.ann1]) # queues eventual-send
+ d = fireEventually()
+ def _then1(ign):
+ self.failUnlessEqual(len(announcements), 1)
+ key_s,ann = announcements[0]
+ self.failUnlessEqual(key_s, pubkey_s)
+ self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
+ self.failUnlessEqual(ann["my-version"], "ver23")
+ d.addCallback(_then1)
+
+ # now send a duplicate announcement. This should not fire the
+ # subscriber
+ d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1]))
+ d.addCallback(fireEventually)
+ def _then2(ign):
+ self.failUnlessEqual(len(announcements), 1)
+ d.addCallback(_then2)
+
+ # and a replacement announcement: same FURL, new other stuff. The
+ # subscriber *should* be fired.
+ d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b]))
+ d.addCallback(fireEventually)
+ def _then3(ign):
+ self.failUnlessEqual(len(announcements), 2)
+ key_s,ann = announcements[-1]
+ self.failUnlessEqual(key_s, pubkey_s)
+ self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
+ self.failUnlessEqual(ann["my-version"], "ver24")
+ d.addCallback(_then3)
+
+ # and a replacement announcement with a different FURL (it uses
+ # different connection hints)
+ d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a]))
+ d.addCallback(fireEventually)
+ def _then4(ign):
+ self.failUnlessEqual(len(announcements), 3)
+ key_s,ann = announcements[-1]
+ self.failUnlessEqual(key_s, pubkey_s)
+ self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
+ self.failUnlessEqual(ann["my-version"], "ver23")
+ d.addCallback(_then4)
+
+ # now add a new subscription, which should be called with the
+ # backlog. The introducer only records one announcement per index, so
+ # the backlog will only have the latest message.
+ announcements2 = []
+ def _received2(key_s, ann):
+ announcements2.append( (key_s, ann) )
+ d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2))
+ d.addCallback(fireEventually)
+ def _then5(ign):
+ self.failUnlessEqual(len(announcements2), 1)
+ key_s,ann = announcements2[-1]
+ self.failUnlessEqual(key_s, pubkey_s)
+ self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a)
+ self.failUnlessEqual(ann["my-version"], "ver23")
+ d.addCallback(_then5)
+ return d
+
+ def test_id_collision(self):
+ # test replacement case where tubid equals a keyid (one should
+ # not replace the other)
+ ic = IntroducerClient(None,
+ "introducer.furl", u"my_nickname",
+ "my_version", "oldest_version", {})
+ announcements = []
+ ic.subscribe_to("storage",
+ lambda key_s,ann: announcements.append(ann))
+ sk_s, vk_s = keyutil.make_keypair()
+ sk, _ignored = keyutil.parse_privkey(sk_s)
+ keyid = keyutil.remove_prefix(vk_s, "pub-v0-")
+ furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
+ furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid
+ ann_t = ic.create_announcement("storage", make_ann(furl1), sk)
+ ic.remote_announce_v2([ann_t])
+ d = fireEventually()
+ def _then(ign):
+ # first announcement has been processed
+ self.failUnlessEqual(len(announcements), 1)
+ self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"],
+ furl1)
+ # now submit a second one, with a tubid that happens to look just
+ # like the pubkey-based serverid we just processed. They should
+ # not overlap.
+ ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")
+ ca = WrapV2ClientInV1Interface(ic)
+ ca.remote_announce([ann2])
+ return fireEventually()
+ d.addCallback(_then)
+ def _then2(ign):
+ # if they overlapped, the second announcement would be ignored
+ self.failUnlessEqual(len(announcements), 2)
+ self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"],
+ furl2)
+ d.addCallback(_then2)
+ return d
+
+
class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
def create_tub(self, portnum=0):
assert self.central_portnum == portnum
tub.setLocation("localhost:%d" % self.central_portnum)
-class SystemTest(SystemTestMixin, unittest.TestCase):
-
- def test_system(self):
- self.basedir = "introducer/SystemTest/system"
+class Queue(SystemTestMixin, unittest.TestCase):
+ def test_queue_until_connected(self):
+ self.basedir = "introducer/QueueUntilConnected/queued"
os.makedirs(self.basedir)
- return self.do_system_test(IntroducerService)
- test_system.timeout = 480 # occasionally takes longer than 350s on "draco"
+ self.create_tub()
+ introducer = IntroducerService()
+ introducer.setServiceParent(self.parent)
+ iff = os.path.join(self.basedir, "introducer.furl")
+ ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
+ tub2 = Tub()
+ tub2.setServiceParent(self.parent)
+ c = IntroducerClient(tub2, ifurl,
+ u"nickname", "version", "oldest", {})
+ furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")
+ sk_s, vk_s = keyutil.make_keypair()
+ sk, _ignored = keyutil.parse_privkey(sk_s)
+
+ d = introducer.disownServiceParent()
+ def _offline(ign):
+ # now that the introducer server is offline, create a client and
+ # publish some messages
+ c.setServiceParent(self.parent) # this starts the reconnector
+ c.publish("storage", make_ann(furl1), sk)
+
+ introducer.setServiceParent(self.parent) # restart the server
+ # now wait for the messages to be delivered
+ def _got_announcement():
+ return bool(introducer.get_announcements())
+ return self.poll(_got_announcement)
+ d.addCallback(_offline)
+ def _done(ign):
+ v = list(introducer.get_announcements().values())[0]
+ (ign, ign, ann1_out, ign) = v
+ self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1)
+ d.addCallback(_done)
+
+ # now let the ack get back
+ def _wait_until_idle(ign):
+ def _idle():
+ if c._debug_outstanding:
+ return False
+ if introducer._debug_outstanding:
+ return False
+ return True
+ return self.poll(_idle)
+ d.addCallback(_wait_until_idle)
+ return d
- def do_system_test(self, create_introducer):
+
+V1 = "v1"; V2 = "v2"
+class SystemTest(SystemTestMixin, unittest.TestCase):
+
+ def do_system_test(self, server_version):
self.create_tub()
- introducer = create_introducer()
+ if server_version == V1:
+ introducer = old.IntroducerService_v1()
+ else:
+ introducer = IntroducerService()
introducer.setServiceParent(self.parent)
iff = os.path.join(self.basedir, "introducer.furl")
tub = self.central_tub
ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
self.introducer_furl = ifurl
- NUMCLIENTS = 5
- # we have 5 clients who publish themselves, and an extra one does
- # which not. When the connections are fully established, all six nodes
+ # we have 5 clients who publish themselves as storage servers, and a
+ # sixth which does which not. All 6 clients subscriber to hear about
+ # storage. When the connections are fully established, all six nodes
# should have 5 connections each.
+ NUM_STORAGE = 5
+ NUM_CLIENTS = 6
clients = []
tubs = {}
received_announcements = {}
- NUM_SERVERS = NUMCLIENTS
subscribing_clients = []
publishing_clients = []
+ privkeys = {}
+ expected_announcements = [0 for c in range(NUM_CLIENTS)]
- for i in range(NUMCLIENTS+1):
+ for i in range(NUM_CLIENTS):
tub = Tub()
#tub.setOption("logLocalFailures", True)
#tub.setOption("logRemoteFailures", True)
tub.setLocation("localhost:%d" % portnum)
log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
- c = IntroducerClient(tub, self.introducer_furl, u"nickname-%d" % i,
- "version", "oldest")
+ if i == 0:
+ c = old.IntroducerClient_v1(tub, self.introducer_furl,
+ u"nickname-%d" % i,
+ "version", "oldest")
+ else:
+ c = IntroducerClient(tub, self.introducer_furl,
+ u"nickname-%d" % i,
+ "version", "oldest",
+ {"component": "component-v1"})
received_announcements[c] = {}
- def got(serverid, ann_d, announcements):
- announcements[serverid] = ann_d
- c.subscribe_to("storage", got, received_announcements[c])
+ def got(key_s_or_tubid, ann, announcements, i):
+ if i == 0:
+ index = get_tubid_string_from_ann(ann)
+ else:
+ index = key_s_or_tubid or get_tubid_string_from_ann(ann)
+ announcements[index] = ann
+ c.subscribe_to("storage", got, received_announcements[c], i)
subscribing_clients.append(c)
-
- if i < NUMCLIENTS:
- node_furl = tub.registerReference(Referenceable())
- c.publish(node_furl, "storage", "ri_name")
+ expected_announcements[i] += 1 # all expect a 'storage' announcement
+
+ node_furl = tub.registerReference(Referenceable())
+ if i < NUM_STORAGE:
+ if i == 0:
+ c.publish(node_furl, "storage", "ri_name")
+ elif i == 1:
+ # sign the announcement
+ privkey_s, pubkey_s = keyutil.make_keypair()
+ privkey, _ignored = keyutil.parse_privkey(privkey_s)
+ privkeys[c] = privkey
+ c.publish("storage", make_ann(node_furl), privkey)
+ else:
+ c.publish("storage", make_ann(node_furl))
publishing_clients.append(c)
- # the last one does not publish anything
+ else:
+ # the last one does not publish anything
+ pass
+
+ if i == 0:
+ # users of the V1 client were required to publish a
+ # 'stub_client' record (somewhat after they published the
+ # 'storage' record), so the introducer could see their
+ # version. Match that behavior.
+ c.publish(node_furl, "stub_client", "stub_ri_name")
+
+ if i == 2:
+ # also publish something that nobody cares about
+ boring_furl = tub.registerReference(Referenceable())
+ c.publish("boring", make_ann(boring_furl))
c.setServiceParent(self.parent)
clients.append(c)
tubs[c] = tub
- def _wait_for_all_connections():
- for c in subscribing_clients:
- if len(received_announcements[c]) < NUM_SERVERS:
+
+ def _wait_for_connected(ign):
+ def _connected():
+ for c in clients:
+ if not c.connected_to_introducer():
+ return False
+ return True
+ return self.poll(_connected)
+
+ # we watch the clients to determine when the system has settled down.
+ # Then we can look inside the server to assert things about its
+ # state.
+
+ def _wait_for_expected_announcements(ign):
+ def _got_expected_announcements():
+ for i,c in enumerate(subscribing_clients):
+ if len(received_announcements[c]) < expected_announcements[i]:
+ return False
+ return True
+ return self.poll(_got_expected_announcements)
+
+ # before shutting down any Tub, we'd like to know that there are no
+ # messages outstanding
+
+ def _wait_until_idle(ign):
+ def _idle():
+ for c in subscribing_clients + publishing_clients:
+ if c._debug_outstanding:
+ return False
+ if introducer._debug_outstanding:
return False
- return True
- d = self.poll(_wait_for_all_connections)
+ return True
+ return self.poll(_idle)
+
+ d = defer.succeed(None)
+ d.addCallback(_wait_for_connected)
+ d.addCallback(_wait_for_expected_announcements)
+ d.addCallback(_wait_until_idle)
def _check1(res):
log.msg("doing _check1")
dc = introducer._debug_counts
- self.failUnlessEqual(dc["inbound_message"], NUM_SERVERS)
- self.failUnlessEqual(dc["inbound_duplicate"], 0)
+ if server_version == V1:
+ # each storage server publishes a record, and (after its
+ # 'subscribe' has been ACKed) also publishes a "stub_client".
+ # The non-storage client (which subscribes) also publishes a
+ # stub_client. There is also one "boring" service. The number
+ # of messages is higher, because the stub_clients aren't
+ # published until after we get the 'subscribe' ack (since we
+ # don't realize that we're dealing with a v1 server [which
+ # needs stub_clients] until then), and the act of publishing
+ # the stub_client causes us to re-send all previous
+ # announcements.
+ self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"],
+ NUM_STORAGE + NUM_CLIENTS + 1)
+ else:
+ # each storage server publishes a record. There is also one
+ # "stub_client" and one "boring"
+ self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
+ self.failUnlessEqual(dc["inbound_duplicate"], 0)
self.failUnlessEqual(dc["inbound_update"], 0)
- self.failUnless(dc["outbound_message"])
+ self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
+ # the number of outbound messages is tricky.. I think it depends
+ # upon a race between the publish and the subscribe messages.
+ self.failUnless(dc["outbound_message"] > 0)
+ # each client subscribes to "storage", and each server publishes
+ self.failUnlessEqual(dc["outbound_announcements"],
+ NUM_STORAGE*NUM_CLIENTS)
- for c in clients:
- self.failUnless(c.connected_to_introducer())
for c in subscribing_clients:
cdc = c._debug_counts
self.failUnless(cdc["inbound_message"])
self.failUnlessEqual(cdc["inbound_announcement"],
- NUM_SERVERS)
+ NUM_STORAGE)
self.failUnlessEqual(cdc["wrong_service"], 0)
self.failUnlessEqual(cdc["duplicate_announcement"], 0)
self.failUnlessEqual(cdc["update"], 0)
self.failUnlessEqual(cdc["new_announcement"],
- NUM_SERVERS)
+ NUM_STORAGE)
anns = received_announcements[c]
- self.failUnlessEqual(len(anns), NUM_SERVERS)
+ self.failUnlessEqual(len(anns), NUM_STORAGE)
- nodeid0 = b32decode(tubs[clients[0]].tubID.upper())
- ann_d = anns[nodeid0]
- nick = ann_d["nickname"]
+ nodeid0 = tubs[clients[0]].tubID
+ ann = anns[nodeid0]
+ nick = ann["nickname"]
self.failUnlessEqual(type(nick), unicode)
self.failUnlessEqual(nick, u"nickname-0")
- for c in publishing_clients:
- cdc = c._debug_counts
- self.failUnlessEqual(cdc["outbound_message"], 1)
+ if server_version == V1:
+ for c in publishing_clients:
+ cdc = c._debug_counts
+ expected = 1 # storage
+ if c is clients[2]:
+ expected += 1 # boring
+ if c is not clients[0]:
+ # the v2 client tries to call publish_v2, which fails
+ # because the server is v1. It then re-sends
+ # everything it has so far, plus a stub_client record
+ expected = 2*expected + 1
+ if c is clients[0]:
+ # we always tell v1 client to send stub_client
+ expected += 1
+ self.failUnlessEqual(cdc["outbound_message"], expected)
+ else:
+ for c in publishing_clients:
+ cdc = c._debug_counts
+ expected = 1
+ if c in [clients[0], # stub_client
+ clients[2], # boring
+ ]:
+ expected = 2
+ self.failUnlessEqual(cdc["outbound_message"], expected)
+ log.msg("_check1 done")
d.addCallback(_check1)
# force an introducer reconnect, by shutting down the Tub it's using
d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
- def _wait_for_introducer_loss():
- for c in clients:
- if c.connected_to_introducer():
- return False
- return True
- d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
+ def _wait_for_introducer_loss(ign):
+ def _introducer_lost():
+ for c in clients:
+ if c.connected_to_introducer():
+ return False
+ return True
+ return self.poll(_introducer_lost)
+ d.addCallback(_wait_for_introducer_loss)
def _restart_introducer_tub(_ign):
log.msg("restarting introducer's Tub")
-
- dc = introducer._debug_counts
- self.expected_count = dc["inbound_message"] + NUM_SERVERS
- self.expected_subscribe_count = dc["inbound_subscribe"] + NUMCLIENTS+1
- introducer._debug0 = dc["outbound_message"]
- for c in subscribing_clients:
- cdc = c._debug_counts
- c._debug0 = cdc["inbound_message"]
-
+ # reset counters
+ for i in range(NUM_CLIENTS):
+ c = subscribing_clients[i]
+ for k in c._debug_counts:
+ c._debug_counts[k] = 0
+ for k in introducer._debug_counts:
+ introducer._debug_counts[k] = 0
+ expected_announcements[i] += 1 # new 'storage' for everyone
self.create_tub(self.central_portnum)
newfurl = self.central_tub.registerReference(introducer,
furlFile=iff)
assert newfurl == self.introducer_furl
d.addCallback(_restart_introducer_tub)
- def _wait_for_introducer_reconnect():
- # wait until:
- # all clients are connected
- # the introducer has received publish messages from all of them
- # the introducer has received subscribe messages from all of them
- # the introducer has sent (duplicate) announcements to all of them
- # all clients have received (duplicate) announcements
- dc = introducer._debug_counts
- for c in clients:
- if not c.connected_to_introducer():
- return False
- if dc["inbound_message"] < self.expected_count:
- return False
- if dc["inbound_subscribe"] < self.expected_subscribe_count:
- return False
- for c in subscribing_clients:
- cdc = c._debug_counts
- if cdc["inbound_message"] < c._debug0+1:
- return False
- return True
- d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect))
+ d.addCallback(_wait_for_connected)
+ d.addCallback(_wait_for_expected_announcements)
+ d.addCallback(_wait_until_idle)
+ d.addCallback(lambda _ign: log.msg(" reconnected"))
+ # TODO: publish something while the introducer is offline, then
+ # confirm it gets delivered when the connection is reestablished
def _check2(res):
log.msg("doing _check2")
# assert that the introducer sent out new messages, one per
# subscriber
dc = introducer._debug_counts
- self.failUnlessEqual(dc["inbound_message"], 2*NUM_SERVERS)
- self.failUnlessEqual(dc["inbound_duplicate"], NUM_SERVERS)
- self.failUnlessEqual(dc["inbound_update"], 0)
- self.failUnlessEqual(dc["outbound_message"],
- introducer._debug0 + len(subscribing_clients))
- for c in clients:
- self.failUnless(c.connected_to_introducer())
+ self.failUnlessEqual(dc["outbound_announcements"],
+ NUM_STORAGE*NUM_CLIENTS)
+ self.failUnless(dc["outbound_message"] > 0)
+ self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
for c in subscribing_clients:
cdc = c._debug_counts
- self.failUnlessEqual(cdc["duplicate_announcement"], NUM_SERVERS)
+ self.failUnlessEqual(cdc["inbound_message"], 1)
+ self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
+ self.failUnlessEqual(cdc["new_announcement"], 0)
+ self.failUnlessEqual(cdc["wrong_service"], 0)
+ self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
d.addCallback(_check2)
# Then force an introducer restart, by shutting down the Tub,
d.addCallback(lambda _ign: log.msg("shutting down introducer"))
d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
- d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
+ d.addCallback(_wait_for_introducer_loss)
+ d.addCallback(lambda _ign: log.msg("introducer lost"))
def _restart_introducer(_ign):
log.msg("restarting introducer")
self.create_tub(self.central_portnum)
-
- for c in subscribing_clients:
- # record some counters for later comparison. Stash the values
- # on the client itself, because I'm lazy.
- cdc = c._debug_counts
- c._debug1 = cdc["inbound_announcement"]
- c._debug2 = cdc["inbound_message"]
- c._debug3 = cdc["new_announcement"]
- newintroducer = create_introducer()
- self.expected_message_count = NUM_SERVERS
- self.expected_announcement_count = NUM_SERVERS*len(subscribing_clients)
- self.expected_subscribe_count = len(subscribing_clients)
- newfurl = self.central_tub.registerReference(newintroducer,
+ # reset counters
+ for i in range(NUM_CLIENTS):
+ c = subscribing_clients[i]
+ for k in c._debug_counts:
+ c._debug_counts[k] = 0
+ expected_announcements[i] += 1 # new 'storage' for everyone
+ if server_version == V1:
+ introducer = old.IntroducerService_v1()
+ else:
+ introducer = IntroducerService()
+ newfurl = self.central_tub.registerReference(introducer,
furlFile=iff)
assert newfurl == self.introducer_furl
d.addCallback(_restart_introducer)
- def _wait_for_introducer_reconnect2():
- # wait until:
- # all clients are connected
- # the introducer has received publish messages from all of them
- # the introducer has received subscribe messages from all of them
- # the introducer has sent announcements for everybody to everybody
- # all clients have received all the (duplicate) announcements
- # at that point, the system should be quiescent
- dc = introducer._debug_counts
- for c in clients:
- if not c.connected_to_introducer():
- return False
- if dc["inbound_message"] < self.expected_message_count:
- return False
- if dc["outbound_announcements"] < self.expected_announcement_count:
- return False
- if dc["inbound_subscribe"] < self.expected_subscribe_count:
- return False
- for c in subscribing_clients:
- cdc = c._debug_counts
- if cdc["inbound_announcement"] < c._debug1+NUM_SERVERS:
- return False
- return True
- d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect2))
+
+ d.addCallback(_wait_for_connected)
+ d.addCallback(_wait_for_expected_announcements)
+ d.addCallback(_wait_until_idle)
def _check3(res):
log.msg("doing _check3")
- for c in clients:
- self.failUnless(c.connected_to_introducer())
+ dc = introducer._debug_counts
+ self.failUnlessEqual(dc["outbound_announcements"],
+ NUM_STORAGE*NUM_CLIENTS)
+ self.failUnless(dc["outbound_message"] > 0)
+ self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
for c in subscribing_clients:
cdc = c._debug_counts
- self.failUnless(cdc["inbound_announcement"] > c._debug1)
- self.failUnless(cdc["inbound_message"] > c._debug2)
- # there should have been no new announcements
- self.failUnlessEqual(cdc["new_announcement"], c._debug3)
- # and the right number of duplicate ones. There were
- # NUM_SERVERS from the servertub restart, and there should be
- # another NUM_SERVERS now
- self.failUnlessEqual(cdc["duplicate_announcement"],
- 2*NUM_SERVERS)
+ self.failUnless(cdc["inbound_message"] > 0)
+ self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
+ self.failUnlessEqual(cdc["new_announcement"], 0)
+ self.failUnlessEqual(cdc["wrong_service"], 0)
+ self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
d.addCallback(_check3)
return d
+
+ def test_system_v2_server(self):
+ self.basedir = "introducer/SystemTest/system_v2_server"
+ os.makedirs(self.basedir)
+ return self.do_system_test(V2)
+ test_system_v2_server.timeout = 480
+ # occasionally takes longer than 350s on "draco"
+
+ def test_system_v1_server(self):
+ self.basedir = "introducer/SystemTest/system_v1_server"
+ os.makedirs(self.basedir)
+ return self.do_system_test(V1)
+ test_system_v1_server.timeout = 480
+ # occasionally takes longer than 350s on "draco"
+
+class FakeRemoteReference:
+ def notifyOnDisconnect(self, *args, **kwargs): pass
+ def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
+
+class ClientInfo(unittest.TestCase):
+ def test_client_v2(self):
+ introducer = IntroducerService()
+ tub = introducer_furl = None
+ app_versions = {"whizzy": "fizzy"}
+ client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
+ "my_version", "oldest", app_versions)
+ #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
+ #ann_s = make_ann_t(client_v2, furl1, None)
+ #introducer.remote_publish_v2(ann_s, Referenceable())
+ subscriber = FakeRemoteReference()
+ introducer.remote_subscribe_v2(subscriber, "storage",
+ client_v2._my_subscriber_info)
+ s = introducer.get_subscribers()
+ self.failUnlessEqual(len(s), 1)
+ sn, when, si, rref = s[0]
+ self.failUnlessIdentical(rref, subscriber)
+ self.failUnlessEqual(sn, "storage")
+ self.failUnlessEqual(si["version"], 0)
+ self.failUnlessEqual(si["oldest-supported"], "oldest")
+ self.failUnlessEqual(si["app-versions"], app_versions)
+ self.failUnlessEqual(si["nickname"], u"nick-v2")
+ self.failUnlessEqual(si["my-version"], "my_version")
+
+ def test_client_v1(self):
+ introducer = IntroducerService()
+ subscriber = FakeRemoteReference()
+ introducer.remote_subscribe(subscriber, "storage")
+ # the v1 subscribe interface had no subscriber_info: that was usually
+ # sent in a separate stub_client pseudo-announcement
+ s = introducer.get_subscribers()
+ self.failUnlessEqual(len(s), 1)
+ sn, when, si, rref = s[0]
+ # rref will be a WrapV1SubscriberInV2Interface around the real
+ # subscriber
+ self.failUnlessIdentical(rref.original, subscriber)
+ self.failUnlessEqual(si, None) # not known yet
+ self.failUnlessEqual(sn, "storage")
+
+ # now submit the stub_client announcement
+ furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
+ ann = (furl1, "stub_client", "RIStubClient",
+ u"nick-v1".encode("utf-8"), "my_version", "oldest")
+ introducer.remote_publish(ann)
+ # the server should correlate the two
+ s = introducer.get_subscribers()
+ self.failUnlessEqual(len(s), 1)
+ sn, when, si, rref = s[0]
+ self.failUnlessIdentical(rref.original, subscriber)
+ self.failUnlessEqual(sn, "storage")
+
+ self.failUnlessEqual(si["version"], 0)
+ self.failUnlessEqual(si["oldest-supported"], "oldest")
+ # v1 announcements do not contain app-versions
+ self.failUnlessEqual(si["app-versions"], {})
+ self.failUnlessEqual(si["nickname"], u"nick-v1")
+ self.failUnlessEqual(si["my-version"], "my_version")
+
+ # a subscription that arrives after the stub_client announcement
+ # should be correlated too
+ subscriber2 = FakeRemoteReference()
+ introducer.remote_subscribe(subscriber2, "thing2")
+
+ s = introducer.get_subscribers()
+ subs = dict([(sn, (si,rref)) for sn, when, si, rref in s])
+ self.failUnlessEqual(len(subs), 2)
+ (si,rref) = subs["thing2"]
+ self.failUnlessIdentical(rref.original, subscriber2)
+ self.failUnlessEqual(si["version"], 0)
+ self.failUnlessEqual(si["oldest-supported"], "oldest")
+ # v1 announcements do not contain app-versions
+ self.failUnlessEqual(si["app-versions"], {})
+ self.failUnlessEqual(si["nickname"], u"nick-v1")
+ self.failUnlessEqual(si["my-version"], "my_version")
+
+class Announcements(unittest.TestCase):
+ def test_client_v2_unsigned(self):
+ introducer = IntroducerService()
+ tub = introducer_furl = None
+ app_versions = {"whizzy": "fizzy"}
+ client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
+ "my_version", "oldest", app_versions)
+ furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
+ tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
+ ann_s0 = make_ann_t(client_v2, furl1, None)
+ canary0 = Referenceable()
+ introducer.remote_publish_v2(ann_s0, canary0)
+ a = introducer.get_announcements()
+ self.failUnlessEqual(len(a), 1)
+ (index, (ann_s, canary, ann, when)) = a.items()[0]
+ self.failUnlessIdentical(canary, canary0)
+ self.failUnlessEqual(index, ("storage", None, tubid))
+ self.failUnlessEqual(ann["app-versions"], app_versions)
+ self.failUnlessEqual(ann["nickname"], u"nick-v2")
+ self.failUnlessEqual(ann["service-name"], "storage")
+ self.failUnlessEqual(ann["my-version"], "my_version")
+ self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
+
+ def test_client_v2_signed(self):
+ introducer = IntroducerService()
+ tub = introducer_furl = None
+ app_versions = {"whizzy": "fizzy"}
+ client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
+ "my_version", "oldest", app_versions)
+ furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
+ sk_s, vk_s = keyutil.make_keypair()
+ sk, _ignored = keyutil.parse_privkey(sk_s)
+ pks = keyutil.remove_prefix(vk_s, "pub-")
+ ann_t0 = make_ann_t(client_v2, furl1, sk)
+ canary0 = Referenceable()
+ introducer.remote_publish_v2(ann_t0, canary0)
+ a = introducer.get_announcements()
+ self.failUnlessEqual(len(a), 1)
+ (index, (ann_s, canary, ann, when)) = a.items()[0]
+ self.failUnlessIdentical(canary, canary0)
+ self.failUnlessEqual(index, ("storage", pks, None))
+ self.failUnlessEqual(ann["app-versions"], app_versions)
+ self.failUnlessEqual(ann["nickname"], u"nick-v2")
+ self.failUnlessEqual(ann["service-name"], "storage")
+ self.failUnlessEqual(ann["my-version"], "my_version")
+ self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
+
+ def test_client_v1(self):
+ introducer = IntroducerService()
+
+ furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
+ tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
+ ann = (furl1, "storage", "RIStorage",
+ u"nick-v1".encode("utf-8"), "my_version", "oldest")
+ introducer.remote_publish(ann)
+
+ a = introducer.get_announcements()
+ self.failUnlessEqual(len(a), 1)
+ (index, (ann_s, canary, ann, when)) = a.items()[0]
+ self.failUnlessEqual(canary, None)
+ self.failUnlessEqual(index, ("storage", None, tubid))
+ self.failUnlessEqual(ann["app-versions"], {})
+ self.failUnlessEqual(ann["nickname"], u"nick-v1".encode("utf-8"))
+ self.failUnlessEqual(ann["service-name"], "storage")
+ self.failUnlessEqual(ann["my-version"], "my_version")
+ self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1)
+
+
class TooNewServer(IntroducerService):
VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
{ },
tub.setLocation("localhost:%d" % portnum)
c = IntroducerClient(tub, self.introducer_furl,
- u"nickname-client", "version", "oldest")
+ u"nickname-client", "version", "oldest", {})
announcements = {}
- def got(serverid, ann_d):
- announcements[serverid] = ann_d
+ def got(key_s, ann):
+ announcements[key_s] = ann
c.subscribe_to("storage", got)
c.setServiceParent(self.parent)
d = self.poll(_got_bad)
def _done(res):
self.failUnless(c._introducer_error)
- self.failUnless(c._introducer_error.check(InsufficientVersionError))
+ self.failUnless(c._introducer_error.check(InsufficientVersionError),
+ c._introducer_error)
d.addCallback(_done)
return d
nodeid = b32decode(m.group(1).upper())
self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
+class Signatures(unittest.TestCase):
+ def test_sign(self):
+ ann = {"key1": "value1"}
+ sk_s,vk_s = keyutil.make_keypair()
+ sk,ignored = keyutil.parse_privkey(sk_s)
+ ann_t = sign_to_foolscap(ann, sk)
+ (msg, sig, key) = ann_t
+ self.failUnlessEqual(type(msg), type("".encode("utf-8"))) # bytes
+ self.failUnlessEqual(simplejson.loads(msg.decode("utf-8")), ann)
+ self.failUnless(sig.startswith("v0-"))
+ self.failUnless(key.startswith("v0-"))
+ (ann2,key2) = unsign_from_foolscap(ann_t)
+ self.failUnlessEqual(ann2, ann)
+ self.failUnlessEqual("pub-"+key2, vk_s)
+
+ # bad signature
+ bad_ann = {"key1": "value2"}
+ bad_msg = simplejson.dumps(bad_ann).encode("utf-8")
+ self.failUnlessRaises(keyutil.BadSignatureError,
+ unsign_from_foolscap, (bad_msg,sig,key))
+ # sneaky bad signature should be ignored
+ (ann2,key2) = unsign_from_foolscap( (bad_msg,None,key) )
+ self.failUnlessEqual(key2, None)
+ self.failUnlessEqual(ann2, bad_ann)
+
+ # unrecognized signatures
+ self.failUnlessRaises(UnknownKeyError,
+ unsign_from_foolscap, (bad_msg,"v999-sig",key))
+ self.failUnlessRaises(UnknownKeyError,
+ unsign_from_foolscap, (bad_msg,sig,"v999-key"))
+
+
+# add tests of StorageFarmBroker: if it receives duplicate announcements, it
+# should leave the Reconnector in place, also if it receives
+# same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it
+# should tear down the Reconnector and make a new one. This behavior used to
+# live in the IntroducerClient, and thus used to be tested by test_introducer
+
+# copying more tests from old branch:
+
+# then also add Upgrade test
storage_broker = StorageFarmBroker(None, True)
for peerid in peerids:
fss = FakeStorageServer(peerid, s)
- storage_broker.test_add_rref(peerid, fss)
+ ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid),
+ "permutation-seed-base32": base32.b2a(peerid) }
+ storage_broker.test_add_rref(peerid, fss, ann)
return storage_broker
def make_nodemaker(s=None, num_peers=10):
newappverstr = "%s: %s" % (allmydata.__appname__, altverstr)
self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res))
- self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
+ self.failUnless("Announcement Summary: storage: 5" in res)
self.failUnless("Subscription Summary: storage: 5" in res)
self.failUnless("tahoe.css" in res)
except unittest.FailTest:
self.failUnlessEqual(data["subscription_summary"],
{"storage": 5})
self.failUnlessEqual(data["announcement_summary"],
- {"storage": 5, "stub_client": 5})
+ {"storage": 5})
self.failUnlessEqual(data["announcement_distinct_hosts"],
- {"storage": 1, "stub_client": 1})
+ {"storage": 1})
except unittest.FailTest:
print
print "GET %s?t=json output was:" % self.introweb_url
from allmydata import uri, monitor, client
from allmydata.immutable import upload, encode
from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
-from allmydata.util import log
+from allmydata.util import log, base32
from allmydata.util.assertutil import precondition
from allmydata.util.deferredutil import DeferredListShouldSucceed
from allmydata.test.no_network import GridTestMixin
for fakeid in range(self.num_servers) ]
self.storage_broker = StorageFarmBroker(None, permute_peers=True)
for (serverid, rref) in servers:
- self.storage_broker.test_add_rref(serverid, rref)
+ ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid),
+ "permutation-seed-base32": base32.b2a(serverid) }
+ self.storage_broker.test_add_rref(serverid, rref, ann)
self.last_servers = [s[1] for s in servers]
def log(self, *args, **kwargs):
def render_JSON(self, ctx):
res = {}
- clients = self.introducer_service.get_subscribers()
- subscription_summary = dict([ (name, len(clients[name]))
- for name in clients ])
- res["subscription_summary"] = subscription_summary
+
+ counts = {}
+ subscribers = self.introducer_service.get_subscribers()
+ for (service_name, ign, ign, ign) in subscribers:
+ if service_name not in counts:
+ counts[service_name] = 0
+ counts[service_name] += 1
+ res["subscription_summary"] = counts
announcement_summary = {}
service_hosts = {}
- for (ann,when) in self.introducer_service.get_announcements().values():
- (furl, service_name, ri_name, nickname, ver, oldest) = ann
+ for a in self.introducer_service.get_announcements().values():
+ (_, _, ann, when) = a
+ service_name = ann["service-name"]
if service_name not in announcement_summary:
announcement_summary[service_name] = 0
announcement_summary[service_name] += 1
# enough: when multiple services are run on a single host,
# they're usually either configured with the same addresses,
# or setLocationAutomatically picks up the same interfaces.
+ furl = ann["anonymous-storage-FURL"]
locations = SturdyRef(furl).getTubRef().getLocations()
# list of tuples, ("ipv4", host, port)
host = frozenset([hint[1]
def render_announcement_summary(self, ctx, data):
services = {}
- for (ann,when) in self.introducer_service.get_announcements().values():
- (furl, service_name, ri_name, nickname, ver, oldest) = ann
+ for a in self.introducer_service.get_announcements().values():
+ (_, _, ann, when) = a
+ service_name = ann["service-name"]
if service_name not in services:
services[service_name] = 0
services[service_name] += 1
for service_name in service_names])
def render_client_summary(self, ctx, data):
+ counts = {}
clients = self.introducer_service.get_subscribers()
- service_names = clients.keys()
- service_names.sort()
- return ", ".join(["%s: %d" % (service_name, len(clients[service_name]))
- for service_name in service_names])
+ for (service_name, ign, ign, ign) in clients:
+ if service_name not in counts:
+ counts[service_name] = 0
+ counts[service_name] += 1
+ return ", ".join([ "%s: %d" % (name, counts[name])
+ for name in sorted(counts.keys()) ] )
def data_services(self, ctx, data):
introsvc = self.introducer_service
- ann = [(since,a)
- for (a,since) in introsvc.get_announcements().values()
- if a[1] != "stub_client"]
- ann.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) )
- return ann
-
- def render_service_row(self, ctx, (since,announcement)):
- (furl, service_name, ri_name, nickname, ver, oldest) = announcement
- sr = SturdyRef(furl)
+ services = []
+ for a in introsvc.get_announcements().values():
+ (_, _, ann, when) = a
+ if ann["service-name"] == "stub_client":
+ continue
+ services.append( (when, ann) )
+ services.sort(key=lambda x: (x[1]["service-name"], x[1]["nickname"]))
+ # this used to be:
+ #services.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) )
+ # service_name was the primary key, then the whole tuple (starting
+ # with the furl) was the secondary key
+ return services
+
+ def render_service_row(self, ctx, (since,ann)):
+ sr = SturdyRef(ann["anonymous-storage-FURL"])
nodeid = sr.tubID
advertised = self.show_location_hints(sr)
ctx.fillSlots("peerid", nodeid)
- ctx.fillSlots("nickname", nickname)
+ ctx.fillSlots("nickname", ann["nickname"])
ctx.fillSlots("advertised", " ".join(advertised))
ctx.fillSlots("connected", "?")
TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
ctx.fillSlots("announced",
time.strftime(TIME_FORMAT, time.localtime(since)))
- ctx.fillSlots("version", ver)
- ctx.fillSlots("service_name", service_name)
+ ctx.fillSlots("version", ann["my-version"])
+ ctx.fillSlots("service_name", ann["service-name"])
return ctx.tag
def data_subscribers(self, ctx, data):
- # use the "stub_client" announcements to get information per nodeid
- clients = {}
- for (ann,when) in self.introducer_service.get_announcements().values():
- if ann[1] != "stub_client":
- continue
- (furl, service_name, ri_name, nickname, ver, oldest) = ann
- sr = SturdyRef(furl)
- nodeid = sr.tubID
- clients[nodeid] = ann
-
- # then we actually provide information per subscriber
- s = []
- introsvc = self.introducer_service
- for service_name, subscribers in introsvc.get_subscribers().items():
- for (rref, timestamp) in subscribers.items():
- sr = rref.getSturdyRef()
- nodeid = sr.tubID
- ann = clients.get(nodeid)
- s.append( (service_name, rref, timestamp, ann) )
- s.sort()
- return s
+ return self.introducer_service.get_subscribers()
def render_subscriber_row(self, ctx, s):
- (service_name, rref, since, ann) = s
- nickname = "?"
- version = "?"
- if ann:
- (furl, service_name_2, ri_name, nickname, version, oldest) = ann
+ (service_name, since, info, rref) = s
+ nickname = info.get("nickname", "?")
+ version = info.get("my-version", "?")
sr = rref.getSturdyRef()
# if the subscriber didn't do Tub.setLocation, nodeid will be None