From: Brian Warner Date: Sun, 20 Nov 2011 10:21:32 +0000 (-0800) Subject: new introducer: signed extensible dictionary-based messages! refs #466 X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/css/module-simplejson.encoder.html?a=commitdiff_plain;h=bc21726dfd73b434c8c513ec482f582cce88fc2a;p=tahoe-lafs%2Ftahoe-lafs.git new introducer: signed extensible dictionary-based messages! refs #466 This introduces new client and server halves to the Introducer (renaming the old one with a _V1 suffix). Both have fallbacks to accomodate talking to a different version: the publishing client switches on whether the server's .get_version() advertises V2 support, the server switches on which subscription method was invoked by the subscribing client. The V2 protocol sends a three-tuple of (serialized announcement dictionary, signature, pubkey) for each announcement. The V2 server dispatches messages to subscribers according to the service-name, and throws errors for invalid signatures, but does not otherwise examine the messages. The V2 receiver's subscription callback will receive a (serverid, ann_dict) pair. The 'serverid' will be equal to the pubkey if all of the following are true: the originating client is V2, and was told a privkey to use the announcement went through a V2 server the signature is valid If not, 'serverid' will be equal to the tubid portion of the announced FURL, as was the case for V1 receivers. Servers will create a keypair if one does not exist yet, stored in private/server.privkey . The signed announcement dictionary puts the server FURL in a key named "anonymous-storage-FURL", which anticipates upcoming Accounting-related changes in the server advertisements. It also provides a key named "permutation-seed-base32" to tell clients what permutation seed to use. This is computed at startup, using tubid if there are existing shares, otherwise the pubkey, to retain share-order compatibility for existing servers. --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index b78fd7b9..d42b6acf 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -1,12 +1,10 @@ import os, stat, time, weakref -from allmydata.interfaces import RIStorageServer from allmydata import node from zope.interface import implements from twisted.internet import reactor, defer from twisted.application import service from twisted.application.internet import TimerService -from foolscap.api import Referenceable from pycryptopp.publickey import rsa import allmydata @@ -16,14 +14,13 @@ from allmydata.immutable.upload import Uploader from allmydata.immutable.offloaded import Helper from allmydata.control import ControlServer from allmydata.introducer.client import IntroducerClient -from allmydata.util import hashutil, base32, pollmixin, log +from allmydata.util import hashutil, base32, pollmixin, log, keyutil from allmydata.util.encodingutil import get_filesystem_encoding from allmydata.util.abbreviate import parse_abbreviated_size from allmydata.util.time_format import parse_duration, parse_date from allmydata.stats import StatsProvider from allmydata.history import History -from allmydata.interfaces import IStatsProducer, RIStubClient, \ - SDMF_VERSION, MDMF_VERSION +from allmydata.interfaces import IStatsProducer, SDMF_VERSION, MDMF_VERSION from allmydata.nodemaker import NodeMaker from allmydata.blacklist import Blacklist from allmydata.node import OldConfigOptionError @@ -35,9 +32,6 @@ GiB=1024*MiB TiB=1024*GiB PiB=1024*TiB -class StubClient(Referenceable): - implements(RIStubClient) - def _make_secret(): return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n" @@ -174,7 +168,8 @@ class Client(node.Node, pollmixin.PollMixin): ic = IntroducerClient(self.tub, self.introducer_furl, self.nickname, str(allmydata.__full_version__), - str(self.OLDEST_SUPPORTED_VERSION)) + str(self.OLDEST_SUPPORTED_VERSION), + self.get_app_versions()) self.introducer_client = ic # hold off on starting the IntroducerClient until our tub has been # started, so we'll have a useful address on our RemoteReference, so @@ -203,12 +198,46 @@ class Client(node.Node, pollmixin.PollMixin): self.convergence = base32.a2b(convergence_s) self._secret_holder = SecretHolder(lease_secret, self.convergence) + def _maybe_create_server_key(self): + # we only create the key once. On all subsequent runs, we re-use the + # existing key + def _make_key(): + sk_vs,vk_vs = keyutil.make_keypair() + return sk_vs+"\n" + sk_vs = self.get_or_create_private_config("server.privkey", _make_key) + sk,vk_vs = keyutil.parse_privkey(sk_vs.strip()) + self.write_config("server.pubkey", vk_vs+"\n") + self._server_key = sk + + def _init_permutation_seed(self, ss): + seed = self.get_config_from_file("permutation-seed") + if not seed: + have_shares = ss.have_shares() + if have_shares: + # if the server has shares but not a recorded + # permutation-seed, then it has been around since pre-#466 + # days, and the clients who uploaded those shares used our + # TubID as a permutation-seed. We should keep using that same + # seed to keep the shares in the same place in the permuted + # ring, so those clients don't have to perform excessive + # searches. + seed = base32.b2a(self.nodeid) + else: + # otherwise, we're free to use the more natural seed of our + # pubkey-based serverid + vk_bytes = self._server_key.get_verifying_key_bytes() + seed = base32.b2a(vk_bytes) + self.write_config("permutation-seed", seed+"\n") + return seed.strip() + def init_storage(self): # should we run a storage server (and publish it for others to use)? if not self.get_config("storage", "enabled", True, boolean=True): return readonly = self.get_config("storage", "readonly", False, boolean=True) + self._maybe_create_server_key() + storedir = os.path.join(self.basedir, self.STOREDIR) data = self.get_config("storage", "reserved_space", None) @@ -262,8 +291,10 @@ class Client(node.Node, pollmixin.PollMixin): def _publish(res): furl_file = os.path.join(self.basedir, "private", "storage.furl").encode(get_filesystem_encoding()) furl = self.tub.registerReference(ss, furlFile=furl_file) - ri_name = RIStorageServer.__remote_name__ - self.introducer_client.publish(furl, "storage", ri_name) + ann = {"anonymous-storage-FURL": furl, + "permutation-seed-base32": self._init_permutation_seed(ss), + } + self.introducer_client.publish("storage", ann, self._server_key) d.addCallback(_publish) d.addErrback(log.err, facility="tahoe.init", level=log.BAD, umid="aLGBKw") @@ -281,7 +312,6 @@ class Client(node.Node, pollmixin.PollMixin): self.terminator.setServiceParent(self) self.add_service(Uploader(helper_furl, self.stats_provider, self.history)) - self.init_stub_client() self.init_blacklist() self.init_nodemaker() @@ -321,20 +351,6 @@ class Client(node.Node, pollmixin.PollMixin): def get_storage_broker(self): return self.storage_broker - def init_stub_client(self): - def _publish(res): - # we publish an empty object so that the introducer can count how - # many clients are connected and see what versions they're - # running. - sc = StubClient() - furl = self.tub.registerReference(sc) - ri_name = RIStubClient.__remote_name__ - self.introducer_client.publish(furl, "stub_client", ri_name) - d = self.when_tub_ready() - d.addCallback(_publish) - d.addErrback(log.err, facility="tahoe.init", - level=log.BAD, umid="OEHq3g") - def init_blacklist(self): fn = os.path.join(self.basedir, "access.blacklist") self.blacklist = Blacklist(fn) diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index a8b11802..163e49da 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -30,14 +30,6 @@ WriteEnablerSecret = Hash # used to protect mutable bucket modifications LeaseRenewSecret = Hash # used to protect bucket lease renewal requests LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests -class RIStubClient(RemoteInterface): - """Each client publishes a service announcement for a dummy object called - the StubClient. This object doesn't actually offer any services, but the - announcement helps the Introducer keep track of which clients are - subscribed (so the grid admin can keep track of things like the size of - the grid and the client versions in use. This is the (empty) - RemoteInterface for the StubClient.""" - class RIBucketWriter(RemoteInterface): """ Objects of this kind live on the server side. """ def write(offset=Offset, data=ShareData): diff --git a/src/allmydata/introducer/client.py b/src/allmydata/introducer/client.py index 31fbb5c2..260e13cf 100644 --- a/src/allmydata/introducer/client.py +++ b/src/allmydata/introducer/client.py @@ -1,29 +1,76 @@ -from base64 import b32decode +import time from zope.interface import implements from twisted.application import service -from foolscap.api import Referenceable, SturdyRef, eventually +from foolscap.api import Referenceable, eventually, RemoteInterface from allmydata.interfaces import InsufficientVersionError -from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \ - IIntroducerClient -from allmydata.util import log, idlib -from allmydata.util.rrefutil import add_version_to_remote_reference, trap_deadref +from allmydata.introducer.interfaces import IIntroducerClient, \ + RIIntroducerSubscriberClient_v1, RIIntroducerSubscriberClient_v2 +from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\ + convert_announcement_v1_to_v2, convert_announcement_v2_to_v1, \ + make_index, get_tubid_string_from_ann, get_tubid_string +from allmydata.util import log +from allmydata.util.rrefutil import add_version_to_remote_reference +from allmydata.util.keyutil import BadSignatureError + +class WrapV2ClientInV1Interface(Referenceable): # for_v1 + """I wrap a v2 IntroducerClient to make it look like a v1 client, so it + can be attached to an old server.""" + implements(RIIntroducerSubscriberClient_v1) + + def __init__(self, original): + self.original = original + def remote_announce(self, announcements): + lp = self.original.log("received %d announcements (v1)" % + len(announcements)) + anns_v1 = set([convert_announcement_v1_to_v2(ann_v1) + for ann_v1 in announcements]) + return self.original.got_announcements(anns_v1, lp) + + def remote_set_encoding_parameters(self, parameters): + self.original.remote_set_encoding_parameters(parameters) + +class RIStubClient(RemoteInterface): # for_v1 + """Each client publishes a service announcement for a dummy object called + the StubClient. This object doesn't actually offer any services, but the + announcement helps the Introducer keep track of which clients are + subscribed (so the grid admin can keep track of things like the size of + the grid and the client versions in use. This is the (empty) + RemoteInterface for the StubClient.""" + +class StubClient(Referenceable): # for_v1 + implements(RIStubClient) + +V1 = "http://allmydata.org/tahoe/protocols/introducer/v1" +V2 = "http://allmydata.org/tahoe/protocols/introducer/v2" class IntroducerClient(service.Service, Referenceable): - implements(RIIntroducerSubscriberClient, IIntroducerClient) + implements(RIIntroducerSubscriberClient_v2, IIntroducerClient) def __init__(self, tub, introducer_furl, - nickname, my_version, oldest_supported): + nickname, my_version, oldest_supported, + app_versions): self._tub = tub self.introducer_furl = introducer_furl assert type(nickname) is unicode - self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8 + self._nickname = nickname self._my_version = my_version self._oldest_supported = oldest_supported + self._app_versions = app_versions - self._published_announcements = set() + self._my_subscriber_info = { "version": 0, + "nickname": self._nickname, + "app-versions": self._app_versions, + "my-version": self._my_version, + "oldest-supported": self._oldest_supported, + } + self._stub_client = None # for_v1 + self._stub_client_furl = None + + self._published_announcements = {} + self._canary = Referenceable() self._publisher = None @@ -33,10 +80,11 @@ class IntroducerClient(service.Service, Referenceable): # _current_announcements remembers one announcement per # (servicename,serverid) pair. Anything that arrives with the same - # pair will displace the previous one. This stores unpacked - # announcement dictionaries, which can be compared for equality to - # distinguish re-announcement from updates. It also provides memory - # for clients who subscribe after startup. + # pair will displace the previous one. This stores tuples of + # (unpacked announcement dictionary, verifyingkey, rxtime). The ann + # dicts can be compared for equality to distinguish re-announcement + # from updates. It also provides memory for clients who subscribe + # after startup. self._current_announcements = {} self.encoding_parameters = None @@ -51,6 +99,11 @@ class IntroducerClient(service.Service, Referenceable): "new_announcement": 0, "outbound_message": 0, } + self._debug_outstanding = 0 + + def _debug_retired(self, res): + self._debug_outstanding -= 1 + return res def startService(self): service.Service.startService(self) @@ -79,10 +132,9 @@ class IntroducerClient(service.Service, Referenceable): def _got_versioned_introducer(self, publisher): self.log("got introducer version: %s" % (publisher.version,)) - # we require a V1 introducer - needed = "http://allmydata.org/tahoe/protocols/introducer/v1" - if needed not in publisher.version: - raise InsufficientVersionError(needed, publisher.version) + # we require an introducer that speaks at least one of (V1, V2) + if not (V1 in publisher.version or V2 in publisher.version): + raise InsufficientVersionError("V1 or V2", publisher.version) self._publisher = publisher publisher.notifyOnDisconnect(self._disconnected) self._maybe_publish() @@ -95,24 +147,17 @@ class IntroducerClient(service.Service, Referenceable): def log(self, *args, **kwargs): if "facility" not in kwargs: - kwargs["facility"] = "tahoe.introducer" + kwargs["facility"] = "tahoe.introducer.client" return log.msg(*args, **kwargs) - - def publish(self, furl, service_name, remoteinterface_name): - assert type(self._nickname_utf8) is str # we always send UTF-8 - ann = (furl, service_name, remoteinterface_name, - self._nickname_utf8, self._my_version, self._oldest_supported) - self._published_announcements.add(ann) - self._maybe_publish() - def subscribe_to(self, service_name, cb, *args, **kwargs): self._local_subscribers.append( (service_name,cb,args,kwargs) ) self._subscribed_service_names.add(service_name) self._maybe_subscribe() - for (servicename,nodeid),ann_d in self._current_announcements.items(): + for index,(ann,key_s,when) in self._current_announcements.items(): + servicename = index[0] if servicename == service_name: - eventually(cb, nodeid, ann_d) + eventually(cb, key_s, ann, *args, **kwargs) def _maybe_subscribe(self): if not self._publisher: @@ -120,96 +165,160 @@ class IntroducerClient(service.Service, Referenceable): level=log.NOISY) return for service_name in self._subscribed_service_names: - if service_name not in self._subscriptions: - # there is a race here, but the subscription desk ignores - # duplicate requests. - self._subscriptions.add(service_name) - d = self._publisher.callRemote("subscribe", self, service_name) - d.addErrback(trap_deadref) - d.addErrback(log.err, format="server errored during subscribe", - facility="tahoe.introducer", - level=log.WEIRD, umid="2uMScQ") + if service_name in self._subscriptions: + continue + self._subscriptions.add(service_name) + if V2 in self._publisher.version: + self._debug_outstanding += 1 + d = self._publisher.callRemote("subscribe_v2", + self, service_name, + self._my_subscriber_info) + d.addBoth(self._debug_retired) + else: + d = self._subscribe_handle_v1(service_name) # for_v1 + d.addErrback(log.err, facility="tahoe.introducer.client", + level=log.WEIRD, umid="2uMScQ") + + def _subscribe_handle_v1(self, service_name): # for_v1 + # they don't speak V2: must be a v1 introducer. Fall back to the v1 + # 'subscribe' method, using a client adapter. + ca = WrapV2ClientInV1Interface(self) + self._debug_outstanding += 1 + d = self._publisher.callRemote("subscribe", ca, service_name) + d.addBoth(self._debug_retired) + # We must also publish an empty 'stub_client' object, so the + # introducer can count how many clients are connected and see what + # versions they're running. + if not self._stub_client_furl: + self._stub_client = sc = StubClient() + self._stub_client_furl = self._tub.registerReference(sc) + def _publish_stub_client(ignored): + furl = self._stub_client_furl + self.publish("stub_client", + { "anonymous-storage-FURL": furl, + "permutation-seed-base32": get_tubid_string(furl), + }) + d.addCallback(_publish_stub_client) + return d + + def create_announcement(self, service_name, ann, signing_key): + full_ann = { "version": 0, + "nickname": self._nickname, + "app-versions": self._app_versions, + "my-version": self._my_version, + "oldest-supported": self._oldest_supported, + + "service-name": service_name, + } + full_ann.update(ann) + return sign_to_foolscap(full_ann, signing_key) + + def publish(self, service_name, ann, signing_key=None): + ann_t = self.create_announcement(service_name, ann, signing_key) + self._published_announcements[service_name] = ann_t + self._maybe_publish() def _maybe_publish(self): if not self._publisher: self.log("want to publish, but no introducer yet", level=log.NOISY) return # this re-publishes everything. The Introducer ignores duplicates - for ann in self._published_announcements: + for ann_t in self._published_announcements.values(): self._debug_counts["outbound_message"] += 1 - d = self._publisher.callRemote("publish", ann) - d.addErrback(trap_deadref) - d.addErrback(log.err, - format="server errored during publish %(ann)s", - ann=ann, facility="tahoe.introducer", + if V2 in self._publisher.version: + self._debug_outstanding += 1 + d = self._publisher.callRemote("publish_v2", ann_t, + self._canary) + d.addBoth(self._debug_retired) + else: + d = self._handle_v1_publisher(ann_t) # for_v1 + d.addErrback(log.err, ann_t=ann_t, + facility="tahoe.introducer.client", level=log.WEIRD, umid="xs9pVQ") + def _handle_v1_publisher(self, ann_t): # for_v1 + # they don't speak V2, so fall back to the old 'publish' method + # (which takes an unsigned tuple of bytestrings) + self.log("falling back to publish_v1", + level=log.UNUSUAL, umid="9RCT1A") + ann_v1 = convert_announcement_v2_to_v1(ann_t) + self._debug_outstanding += 1 + d = self._publisher.callRemote("publish", ann_v1) + d.addBoth(self._debug_retired) + return d - def remote_announce(self, announcements): - self.log("received %d announcements" % len(announcements)) + def remote_announce_v2(self, announcements): + lp = self.log("received %d announcements (v2)" % len(announcements)) + return self.got_announcements(announcements, lp) + + def got_announcements(self, announcements, lp=None): + # this is the common entry point for both v1 and v2 announcements self._debug_counts["inbound_message"] += 1 - for ann in announcements: + for ann_t in announcements: try: - self._process_announcement(ann) - except: - log.err(format="unable to process announcement %(ann)s", - ann=ann) - # Don't let a corrupt announcement prevent us from processing - # the remaining ones. Don't return an error to the server, - # since they'd just ignore it anyways. - pass - - def _process_announcement(self, ann): + # this might raise UnknownKeyError or bad-sig error + ann, key_s = unsign_from_foolscap(ann_t) + # key is "v0-base32abc123" + except BadSignatureError: + self.log("bad signature on inbound announcement: %s" % (ann_t,), + parent=lp, level=log.WEIRD, umid="ZAU15Q") + # process other announcements that arrived with the bad one + continue + + self._process_announcement(ann, key_s) + + def _process_announcement(self, ann, key_s): self._debug_counts["inbound_announcement"] += 1 - (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann + service_name = str(ann["service-name"]) if service_name not in self._subscribed_service_names: self.log("announcement for a service we don't care about [%s]" % (service_name,), level=log.UNUSUAL, umid="dIpGNA") self._debug_counts["wrong_service"] += 1 return - self.log("announcement for [%s]: %s" % (service_name, ann), - umid="BoKEag") - assert type(furl) is str - assert type(service_name) is str - assert type(ri_name) is str - assert type(nickname_utf8) is str - nickname = nickname_utf8.decode("utf-8") - assert type(nickname) is unicode - assert type(ver) is str - assert type(oldest) is str - - nodeid = b32decode(SturdyRef(furl).tubID.upper()) - nodeid_s = idlib.shortnodeid_b2a(nodeid) - - ann_d = { "version": 0, - "service-name": service_name, - - "FURL": furl, - "nickname": nickname, - "app-versions": {}, # need #466 and v2 introducer - "my-version": ver, - "oldest-supported": oldest, - } - - index = (service_name, nodeid) - if self._current_announcements.get(index, None) == ann_d: - self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring", - service=service_name, nodeid=nodeid_s, - level=log.UNUSUAL, umid="B1MIdA") + # for ASCII values, simplejson might give us unicode *or* bytes + if "nickname" in ann and isinstance(ann["nickname"], str): + ann["nickname"] = unicode(ann["nickname"]) + nick_s = ann.get("nickname",u"").encode("utf-8") + lp2 = self.log(format="announcement for nickname '%(nick)s', service=%(svc)s: %(ann)s", + nick=nick_s, svc=service_name, ann=ann, umid="BoKEag") + + # how do we describe this node in the logs? + desc_bits = [] + if key_s: + desc_bits.append("serverid=" + key_s[:20]) + if "anonymous-storage-FURL" in ann: + tubid_s = get_tubid_string_from_ann(ann) + desc_bits.append("tubid=" + tubid_s[:8]) + description = "/".join(desc_bits) + + # the index is used to track duplicates + index = make_index(ann, key_s) + + # is this announcement a duplicate? + if (index in self._current_announcements + and self._current_announcements[index][0] == ann): + self.log(format="reannouncement for [%(service)s]:%(description)s, ignoring", + service=service_name, description=description, + parent=lp2, level=log.UNUSUAL, umid="B1MIdA") self._debug_counts["duplicate_announcement"] += 1 return + # does it update an existing one? if index in self._current_announcements: self._debug_counts["update"] += 1 + self.log("replacing old announcement: %s" % (ann,), + parent=lp2, level=log.NOISY, umid="wxwgIQ") else: self._debug_counts["new_announcement"] += 1 + self.log("new announcement[%s]" % service_name, + parent=lp2, level=log.NOISY) - self._current_announcements[index] = ann_d + self._current_announcements[index] = (ann, key_s, time.time()) # note: we never forget an index, but we might update its value for (service_name2,cb,args,kwargs) in self._local_subscribers: if service_name2 == service_name: - eventually(cb, nodeid, ann_d, *args, **kwargs) + eventually(cb, key_s, ann, *args, **kwargs) def remote_set_encoding_parameters(self, parameters): self.encoding_parameters = parameters diff --git a/src/allmydata/introducer/common.py b/src/allmydata/introducer/common.py new file mode 100644 index 00000000..2f6e9c89 --- /dev/null +++ b/src/allmydata/introducer/common.py @@ -0,0 +1,92 @@ + +import re, simplejson +from allmydata.util import keyutil, base32 + +def make_index(ann, key_s): + """Return something that can be used as an index (e.g. a tuple of + strings), such that two messages that refer to the same 'thing' will have + the same index. This is a tuple of (service-name, signing-key, None) for + signed announcements, or (service-name, None, tubid) for unsigned + announcements.""" + + service_name = str(ann["service-name"]) + if key_s: + return (service_name, key_s, None) + else: + tubid = get_tubid_string_from_ann(ann) + return (service_name, None, tubid) + +def get_tubid_string_from_ann(ann): + return get_tubid_string(str(ann.get("anonymous-storage-FURL") + or ann.get("FURL"))) + +def get_tubid_string(furl): + m = re.match(r'pb://(\w+)@', furl) + assert m + return m.group(1).lower() + +def convert_announcement_v1_to_v2(ann_t): + (furl, service_name, ri_name, nickname, ver, oldest) = ann_t + assert type(furl) is str + assert type(service_name) is str + # ignore ri_name + assert type(nickname) is str + assert type(ver) is str + assert type(oldest) is str + ann = {"version": 0, + "nickname": nickname.decode("utf-8"), + "app-versions": {}, + "my-version": ver, + "oldest-supported": oldest, + + "service-name": service_name, + "anonymous-storage-FURL": furl, + "permutation-seed-base32": get_tubid_string(furl), + } + msg = simplejson.dumps(ann).encode("utf-8") + return (msg, None, None) + +def convert_announcement_v2_to_v1(ann_v2): + (msg, sig, pubkey) = ann_v2 + ann = simplejson.loads(msg) + assert ann["version"] == 0 + ann_t = (str(ann["anonymous-storage-FURL"]), + str(ann["service-name"]), + "remoteinterface-name is unused", + ann["nickname"].encode("utf-8"), + str(ann["my-version"]), + str(ann["oldest-supported"]), + ) + return ann_t + + +def sign_to_foolscap(ann, sk): + # return (bytes, None, None) or (bytes, sig-str, pubkey-str). A future + # HTTP-based serialization will use JSON({msg:b64(JSON(msg).utf8), + # sig:v0-b64(sig), pubkey:v0-b64(pubkey)}) . + msg = simplejson.dumps(ann).encode("utf-8") + if sk: + sig = "v0-"+base32.b2a(sk.sign(msg)) + vk_bytes = sk.get_verifying_key_bytes() + ann_t = (msg, sig, "v0-"+base32.b2a(vk_bytes)) + else: + ann_t = (msg, None, None) + return ann_t + +class UnknownKeyError(Exception): + pass + +def unsign_from_foolscap(ann_t): + (msg, sig_vs, claimed_key_vs) = ann_t + key_vs = None + if sig_vs and claimed_key_vs: + if not sig_vs.startswith("v0-"): + raise UnknownKeyError("only v0- signatures recognized") + if not claimed_key_vs.startswith("v0-"): + raise UnknownKeyError("only v0- keys recognized") + claimed_key = keyutil.parse_pubkey("pub-"+claimed_key_vs) + sig_bytes = base32.a2b(keyutil.remove_prefix(sig_vs, "v0-")) + claimed_key.verify(sig_bytes, msg) + key_vs = claimed_key_vs + ann = simplejson.loads(msg.decode("utf-8")) + return (ann, key_vs) diff --git a/src/allmydata/introducer/interfaces.py b/src/allmydata/introducer/interfaces.py index 54f1701f..53d875ac 100644 --- a/src/allmydata/introducer/interfaces.py +++ b/src/allmydata/introducer/interfaces.py @@ -1,9 +1,12 @@ from zope.interface import Interface from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \ - RemoteInterface + RemoteInterface, Referenceable +from old import RIIntroducerSubscriberClient_v1 FURL = StringConstraint(1000) +# old introducer protocol (v1): +# # Announcements are (FURL, service_name, remoteinterface_name, # nickname, my_version, oldest_supported) # the (FURL, service_name, remoteinterface_name) refer to the service being @@ -14,13 +17,17 @@ FURL = StringConstraint(1000) # incompatible peer. The second goal is to enable the development of # backwards-compatibility code. -Announcement = TupleOf(FURL, str, str, - str, str, str) +Announcement_v1 = TupleOf(FURL, str, str, + str, str, str) -class RIIntroducerSubscriberClient(RemoteInterface): - __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com" +# v2 protocol over foolscap: Announcements are 3-tuples of (bytes, str, str) +# or (bytes, none, none) +Announcement_v2 = Any() - def announce(announcements=SetOf(Announcement)): +class RIIntroducerSubscriberClient_v2(RemoteInterface): + __remote_name__ = "RIIntroducerSubscriberClient_v2.tahoe.allmydata.com" + + def announce_v2(announcements=SetOf(Announcement_v2)): """I accept announcements from the publisher.""" return None @@ -41,38 +48,29 @@ class RIIntroducerSubscriberClient(RemoteInterface): """ return None -# When Foolscap can handle multiple interfaces (Foolscap#17), the -# full-powered introducer will implement both RIIntroducerPublisher and -# RIIntroducerSubscriberService. Until then, we define -# RIIntroducerPublisherAndSubscriberService as a combination of the two, and -# make everybody use that. +SubscriberInfo = DictOf(str, Any()) -class RIIntroducerPublisher(RemoteInterface): +class RIIntroducerPublisherAndSubscriberService_v2(RemoteInterface): """To publish a service to the world, connect to me and give me your - announcement message. I will deliver a copy to all connected subscribers.""" - __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com" - - def publish(announcement=Announcement): - # canary? - return None - -class RIIntroducerSubscriberService(RemoteInterface): - __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com" - - def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str): - """Give me a subscriber reference, and I will call its new_peers() - method will any announcements that match the desired service name. I - will ignore duplicate subscriptions. - """ - return None - -class RIIntroducerPublisherAndSubscriberService(RemoteInterface): - __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com" + announcement message. I will deliver a copy to all connected subscribers. + To hear about services, connect to me and subscribe to a specific + service_name.""" + __remote_name__ = "RIIntroducerPublisherAndSubscriberService_v2.tahoe.allmydata.com" def get_version(): return DictOf(str, Any()) - def publish(announcement=Announcement): + def publish(announcement=Announcement_v1): + return None + def publish_v2(announcement=Announcement_v2, canary=Referenceable): return None - def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str): + def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str): + return None + def subscribe_v2(subscriber=RIIntroducerSubscriberClient_v2, + service_name=str, subscriber_info=SubscriberInfo): + """Give me a subscriber reference, and I will call its announce_v2() + method with any announcements that match the desired service name. I + will ignore duplicate subscriptions. The subscriber_info dictionary + tells me about the subscriber, and is used for diagnostic/status + displays.""" return None class IIntroducerClient(Interface): @@ -80,41 +78,47 @@ class IIntroducerClient(Interface): publish their services to the rest of the world, and I help them learn about services available on other nodes.""" - def publish(furl, service_name, remoteinterface_name): - """Once you call this, I will tell the world that the Referenceable - available at FURL is available to provide a service named - SERVICE_NAME. The precise definition of the service being provided is - identified by the Foolscap 'remote interface name' in the last - parameter: this is supposed to be a globally-unique string that - identifies the RemoteInterface that is implemented.""" + def publish(service_name, ann, signing_key=None): + """Publish the given announcement dictionary (which must be + JSON-serializable), plus some additional keys, to the world. + + Each announcement is characterized by a (service_name, serverid) + pair. When the server sees two announcements with the same pair, the + later one will replace the earlier one. The serverid is derived from + the signing_key, if present, otherwise it is derived from the + 'anonymous-storage-FURL' key. + + If signing_key= is set to an instance of SigningKey, it will be + used to sign the announcement.""" def subscribe_to(service_name, callback, *args, **kwargs): """Call this if you will eventually want to use services with the given SERVICE_NAME. This will prompt me to subscribe to announcements of those services. Your callback will be invoked with at least two - arguments: a serverid (binary string), and an announcement - dictionary, followed by any additional callback args/kwargs you give - me. I will run your callback for both new announcements and for + arguments: a pubkey and an announcement dictionary, followed by any + additional callback args/kwargs you gave me. The pubkey will be None + unless the announcement was signed by the corresponding pubkey, in + which case it will be a printable string like 'v0-base32..'. + + I will run your callback for both new announcements and for announcements that have changed, but you must be prepared to tolerate duplicates. - The announcement dictionary that I give you will have the following - keys: + The announcement that I give you comes from some other client. It + will be a JSON-serializable dictionary which (by convention) is + expected to have at least the following keys: version: 0 - service-name: str('storage') - - FURL: str(furl) - remoteinterface-name: str(ri_name) nickname: unicode app-versions: {} my-version: str oldest-supported: str - Note that app-version will be an empty dictionary until #466 is done - and both the introducer and the remote client have been upgraded. For - current (native) server types, the serverid will always be equal to - the binary form of the FURL's tubid. + service-name: str('storage') + anonymous-storage-FURL: str(furl) + + Note that app-version will be an empty dictionary if either the + publishing client or the Introducer are running older code. """ def connected_to_introducer(): diff --git a/src/allmydata/introducer/old.py b/src/allmydata/introducer/old.py new file mode 100644 index 00000000..e0bdacf7 --- /dev/null +++ b/src/allmydata/introducer/old.py @@ -0,0 +1,463 @@ + +import time +from base64 import b32decode +from zope.interface import implements, Interface +from twisted.application import service +import allmydata +from allmydata.interfaces import InsufficientVersionError +from allmydata.util import log, idlib, rrefutil +from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \ + RemoteInterface, Referenceable, eventually, SturdyRef +FURL = StringConstraint(1000) + +# We keep a copy of the old introducer (both client and server) here to +# support compatibility tests. The old client is supposed to handle the new +# server, and new client is supposed to handle the old server. + + +# Announcements are (FURL, service_name, remoteinterface_name, +# nickname, my_version, oldest_supported) +# the (FURL, service_name, remoteinterface_name) refer to the service being +# announced. The (nickname, my_version, oldest_supported) refer to the +# client as a whole. The my_version/oldest_supported strings can be parsed +# by an allmydata.util.version.Version instance, and then compared. The +# first goal is to make sure that nodes are not confused by speaking to an +# incompatible peer. The second goal is to enable the development of +# backwards-compatibility code. + +Announcement = TupleOf(FURL, str, str, + str, str, str) + +class RIIntroducerSubscriberClient_v1(RemoteInterface): + __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com" + + def announce(announcements=SetOf(Announcement)): + """I accept announcements from the publisher.""" + return None + + def set_encoding_parameters(parameters=(int, int, int)): + """Advise the client of the recommended k-of-n encoding parameters + for this grid. 'parameters' is a tuple of (k, desired, n), where 'n' + is the total number of shares that will be created for any given + file, while 'k' is the number of shares that must be retrieved to + recover that file, and 'desired' is the minimum number of shares that + must be placed before the uploader will consider its job a success. + n/k is the expansion ratio, while k determines the robustness. + + Introducers should specify 'n' according to the expected size of the + grid (there is no point to producing more shares than there are + peers), and k according to the desired reliability-vs-overhead goals. + + Note that setting k=1 is equivalent to simple replication. + """ + return None + +# When Foolscap can handle multiple interfaces (Foolscap#17), the +# full-powered introducer will implement both RIIntroducerPublisher and +# RIIntroducerSubscriberService. Until then, we define +# RIIntroducerPublisherAndSubscriberService as a combination of the two, and +# make everybody use that. + +class RIIntroducerPublisher_v1(RemoteInterface): + """To publish a service to the world, connect to me and give me your + announcement message. I will deliver a copy to all connected subscribers.""" + __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com" + + def publish(announcement=Announcement): + # canary? + return None + +class RIIntroducerSubscriberService_v1(RemoteInterface): + __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com" + + def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str): + """Give me a subscriber reference, and I will call its new_peers() + method will any announcements that match the desired service name. I + will ignore duplicate subscriptions. + """ + return None + +class RIIntroducerPublisherAndSubscriberService_v1(RemoteInterface): + __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com" + def get_version(): + return DictOf(str, Any()) + def publish(announcement=Announcement): + return None + def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str): + return None + +class IIntroducerClient(Interface): + """I provide service introduction facilities for a node. I help nodes + publish their services to the rest of the world, and I help them learn + about services available on other nodes.""" + + def publish(furl, service_name, remoteinterface_name): + """Once you call this, I will tell the world that the Referenceable + available at FURL is available to provide a service named + SERVICE_NAME. The precise definition of the service being provided is + identified by the Foolscap 'remote interface name' in the last + parameter: this is supposed to be a globally-unique string that + identifies the RemoteInterface that is implemented.""" + + def subscribe_to(service_name, callback, *args, **kwargs): + """Call this if you will eventually want to use services with the + given SERVICE_NAME. This will prompt me to subscribe to announcements + of those services. Your callback will be invoked with at least two + arguments: a serverid (binary string), and an announcement + dictionary, followed by any additional callback args/kwargs you give + me. I will run your callback for both new announcements and for + announcements that have changed, but you must be prepared to tolerate + duplicates. + + The announcement dictionary that I give you will have the following + keys: + + version: 0 + service-name: str('storage') + + FURL: str(furl) + remoteinterface-name: str(ri_name) + nickname: unicode + app-versions: {} + my-version: str + oldest-supported: str + + Note that app-version will be an empty dictionary until #466 is done + and both the introducer and the remote client have been upgraded. For + current (native) server types, the serverid will always be equal to + the binary form of the FURL's tubid. + """ + + def connected_to_introducer(): + """Returns a boolean, True if we are currently connected to the + introducer, False if not.""" + + +class IntroducerClient_v1(service.Service, Referenceable): + implements(RIIntroducerSubscriberClient_v1, IIntroducerClient) + + def __init__(self, tub, introducer_furl, + nickname, my_version, oldest_supported): + self._tub = tub + self.introducer_furl = introducer_furl + + assert type(nickname) is unicode + self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8 + self._my_version = my_version + self._oldest_supported = oldest_supported + + self._published_announcements = set() + + self._publisher = None + + self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples + self._subscribed_service_names = set() + self._subscriptions = set() # requests we've actually sent + + # _current_announcements remembers one announcement per + # (servicename,serverid) pair. Anything that arrives with the same + # pair will displace the previous one. This stores unpacked + # announcement dictionaries, which can be compared for equality to + # distinguish re-announcement from updates. It also provides memory + # for clients who subscribe after startup. + self._current_announcements = {} + + self.encoding_parameters = None + + # hooks for unit tests + self._debug_counts = { + "inbound_message": 0, + "inbound_announcement": 0, + "wrong_service": 0, + "duplicate_announcement": 0, + "update": 0, + "new_announcement": 0, + "outbound_message": 0, + } + self._debug_outstanding = 0 + + def _debug_retired(self, res): + self._debug_outstanding -= 1 + return res + + def startService(self): + service.Service.startService(self) + self._introducer_error = None + rc = self._tub.connectTo(self.introducer_furl, self._got_introducer) + self._introducer_reconnector = rc + def connect_failed(failure): + self.log("Initial Introducer connection failed: perhaps it's down", + level=log.WEIRD, failure=failure, umid="c5MqUQ") + d = self._tub.getReference(self.introducer_furl) + d.addErrback(connect_failed) + + def _got_introducer(self, publisher): + self.log("connected to introducer, getting versions") + default = { "http://allmydata.org/tahoe/protocols/introducer/v1": + { }, + "application-version": "unknown: no get_version()", + } + d = rrefutil.add_version_to_remote_reference(publisher, default) + d.addCallback(self._got_versioned_introducer) + d.addErrback(self._got_error) + + def _got_error(self, f): + # TODO: for the introducer, perhaps this should halt the application + self._introducer_error = f # polled by tests + + def _got_versioned_introducer(self, publisher): + self.log("got introducer version: %s" % (publisher.version,)) + # we require a V1 introducer + needed = "http://allmydata.org/tahoe/protocols/introducer/v1" + if needed not in publisher.version: + raise InsufficientVersionError(needed, publisher.version) + self._publisher = publisher + publisher.notifyOnDisconnect(self._disconnected) + self._maybe_publish() + self._maybe_subscribe() + + def _disconnected(self): + self.log("bummer, we've lost our connection to the introducer") + self._publisher = None + self._subscriptions.clear() + + def log(self, *args, **kwargs): + if "facility" not in kwargs: + kwargs["facility"] = "tahoe.introducer" + return log.msg(*args, **kwargs) + + + def publish(self, furl, service_name, remoteinterface_name): + assert type(self._nickname_utf8) is str # we always send UTF-8 + ann = (furl, service_name, remoteinterface_name, + self._nickname_utf8, self._my_version, self._oldest_supported) + self._published_announcements.add(ann) + self._maybe_publish() + + def subscribe_to(self, service_name, cb, *args, **kwargs): + self._local_subscribers.append( (service_name,cb,args,kwargs) ) + self._subscribed_service_names.add(service_name) + self._maybe_subscribe() + for (servicename,nodeid),ann_d in self._current_announcements.items(): + if servicename == service_name: + eventually(cb, nodeid, ann_d) + + def _maybe_subscribe(self): + if not self._publisher: + self.log("want to subscribe, but no introducer yet", + level=log.NOISY) + return + for service_name in self._subscribed_service_names: + if service_name not in self._subscriptions: + # there is a race here, but the subscription desk ignores + # duplicate requests. + self._subscriptions.add(service_name) + self._debug_outstanding += 1 + d = self._publisher.callRemote("subscribe", self, service_name) + d.addBoth(self._debug_retired) + d.addErrback(rrefutil.trap_deadref) + d.addErrback(log.err, format="server errored during subscribe", + facility="tahoe.introducer", + level=log.WEIRD, umid="2uMScQ") + + def _maybe_publish(self): + if not self._publisher: + self.log("want to publish, but no introducer yet", level=log.NOISY) + return + # this re-publishes everything. The Introducer ignores duplicates + for ann in self._published_announcements: + self._debug_counts["outbound_message"] += 1 + self._debug_outstanding += 1 + d = self._publisher.callRemote("publish", ann) + d.addBoth(self._debug_retired) + d.addErrback(rrefutil.trap_deadref) + d.addErrback(log.err, + format="server errored during publish %(ann)s", + ann=ann, facility="tahoe.introducer", + level=log.WEIRD, umid="xs9pVQ") + + + + def remote_announce(self, announcements): + self.log("received %d announcements" % len(announcements)) + self._debug_counts["inbound_message"] += 1 + for ann in announcements: + try: + self._process_announcement(ann) + except: + log.err(format="unable to process announcement %(ann)s", + ann=ann) + # Don't let a corrupt announcement prevent us from processing + # the remaining ones. Don't return an error to the server, + # since they'd just ignore it anyways. + pass + + def _process_announcement(self, ann): + self._debug_counts["inbound_announcement"] += 1 + (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann + if service_name not in self._subscribed_service_names: + self.log("announcement for a service we don't care about [%s]" + % (service_name,), level=log.UNUSUAL, umid="dIpGNA") + self._debug_counts["wrong_service"] += 1 + return + self.log("announcement for [%s]: %s" % (service_name, ann), + umid="BoKEag") + assert type(furl) is str + assert type(service_name) is str + assert type(ri_name) is str + assert type(nickname_utf8) is str + nickname = nickname_utf8.decode("utf-8") + assert type(nickname) is unicode + assert type(ver) is str + assert type(oldest) is str + + nodeid = b32decode(SturdyRef(furl).tubID.upper()) + nodeid_s = idlib.shortnodeid_b2a(nodeid) + + ann_d = { "version": 0, + "service-name": service_name, + + "FURL": furl, + "nickname": nickname, + "app-versions": {}, # need #466 and v2 introducer + "my-version": ver, + "oldest-supported": oldest, + } + + index = (service_name, nodeid) + if self._current_announcements.get(index, None) == ann_d: + self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring", + service=service_name, nodeid=nodeid_s, + level=log.UNUSUAL, umid="B1MIdA") + self._debug_counts["duplicate_announcement"] += 1 + return + if index in self._current_announcements: + self._debug_counts["update"] += 1 + else: + self._debug_counts["new_announcement"] += 1 + + self._current_announcements[index] = ann_d + # note: we never forget an index, but we might update its value + + for (service_name2,cb,args,kwargs) in self._local_subscribers: + if service_name2 == service_name: + eventually(cb, nodeid, ann_d, *args, **kwargs) + + def remote_set_encoding_parameters(self, parameters): + self.encoding_parameters = parameters + + def connected_to_introducer(self): + return bool(self._publisher) + +class IntroducerService_v1(service.MultiService, Referenceable): + implements(RIIntroducerPublisherAndSubscriberService_v1) + name = "introducer" + VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": + { }, + "application-version": str(allmydata.__full_version__), + } + + def __init__(self, basedir="."): + service.MultiService.__init__(self) + self.introducer_url = None + # 'index' is (service_name, tubid) + self._announcements = {} # dict of index -> (announcement, timestamp) + self._subscribers = {} # dict of (rref->timestamp) dicts + self._debug_counts = {"inbound_message": 0, + "inbound_duplicate": 0, + "inbound_update": 0, + "outbound_message": 0, + "outbound_announcements": 0, + "inbound_subscribe": 0} + self._debug_outstanding = 0 + + def _debug_retired(self, res): + self._debug_outstanding -= 1 + return res + + def log(self, *args, **kwargs): + if "facility" not in kwargs: + kwargs["facility"] = "tahoe.introducer" + return log.msg(*args, **kwargs) + + def get_announcements(self): + return self._announcements + def get_subscribers(self): + return self._subscribers + + def remote_get_version(self): + return self.VERSION + + def remote_publish(self, announcement): + try: + self._publish(announcement) + except: + log.err(format="Introducer.remote_publish failed on %(ann)s", + ann=announcement, level=log.UNUSUAL, umid="620rWA") + raise + + def _publish(self, announcement): + self._debug_counts["inbound_message"] += 1 + self.log("introducer: announcement published: %s" % (announcement,) ) + (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement + #print "PUB", service_name, nickname_utf8 + + nodeid = b32decode(SturdyRef(furl).tubID.upper()) + index = (service_name, nodeid) + + if index in self._announcements: + (old_announcement, timestamp) = self._announcements[index] + if old_announcement == announcement: + self.log("but we already knew it, ignoring", level=log.NOISY) + self._debug_counts["inbound_duplicate"] += 1 + return + else: + self.log("old announcement being updated", level=log.NOISY) + self._debug_counts["inbound_update"] += 1 + self._announcements[index] = (announcement, time.time()) + + for s in self._subscribers.get(service_name, []): + self._debug_counts["outbound_message"] += 1 + self._debug_counts["outbound_announcements"] += 1 + self._debug_outstanding += 1 + d = s.callRemote("announce", set([announcement])) + d.addBoth(self._debug_retired) + d.addErrback(rrefutil.trap_deadref) + d.addErrback(log.err, + format="subscriber errored on announcement %(ann)s", + ann=announcement, facility="tahoe.introducer", + level=log.UNUSUAL, umid="jfGMXQ") + + def remote_subscribe(self, subscriber, service_name): + self.log("introducer: subscription[%s] request at %s" % (service_name, + subscriber)) + self._debug_counts["inbound_subscribe"] += 1 + if service_name not in self._subscribers: + self._subscribers[service_name] = {} + subscribers = self._subscribers[service_name] + if subscriber in subscribers: + self.log("but they're already subscribed, ignoring", + level=log.UNUSUAL) + return + subscribers[subscriber] = time.time() + def _remove(): + self.log("introducer: unsubscribing[%s] %s" % (service_name, + subscriber)) + subscribers.pop(subscriber, None) + subscriber.notifyOnDisconnect(_remove) + + announcements = set( + [ ann + for (sn2,nodeid),(ann,when) in self._announcements.items() + if sn2 == service_name] ) + + self._debug_counts["outbound_message"] += 1 + self._debug_counts["outbound_announcements"] += len(announcements) + self._debug_outstanding += 1 + d = subscriber.callRemote("announce", announcements) + d.addBoth(self._debug_retired) + d.addErrback(rrefutil.trap_deadref) + d.addErrback(log.err, + format="subscriber errored during subscribe %(anns)s", + anns=announcements, facility="tahoe.introducer", + level=log.UNUSUAL, umid="1XChxA") diff --git a/src/allmydata/introducer/server.py b/src/allmydata/introducer/server.py index 10dc1a05..41f92b23 100644 --- a/src/allmydata/introducer/server.py +++ b/src/allmydata/introducer/server.py @@ -1,14 +1,16 @@ import time, os.path -from base64 import b32decode from zope.interface import implements from twisted.application import service -from foolscap.api import Referenceable, SturdyRef +from foolscap.api import Referenceable import allmydata from allmydata import node -from allmydata.util import log, rrefutil +from allmydata.util import log from allmydata.introducer.interfaces import \ - RIIntroducerPublisherAndSubscriberService + RIIntroducerPublisherAndSubscriberService_v2 +from allmydata.introducer.common import convert_announcement_v1_to_v2, \ + convert_announcement_v2_to_v1, unsign_from_foolscap, make_index, \ + get_tubid_string_from_ann class IntroducerNode(node.Node): PORTNUMFILE = "introducer.port" @@ -31,14 +33,15 @@ class IntroducerNode(node.Node): def _publish(res): self.introducer_url = self.tub.registerReference(introducerservice, "introducer") - self.log(" introducer is at %s" % self.introducer_url) + self.log(" introducer is at %s" % self.introducer_url, + umid="qF2L9A") self.write_config("introducer.furl", self.introducer_url + "\n") d.addCallback(_publish) d.addErrback(log.err, facility="tahoe.init", level=log.BAD, umid="UaNs9A") def init_web(self, webport): - self.log("init_web(webport=%s)", args=(webport,)) + self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA") from allmydata.webish import IntroducerWebishServer nodeurl_path = os.path.join(self.basedir, "node.url") @@ -47,105 +50,260 @@ class IntroducerNode(node.Node): ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir) self.add_service(ws) +class WrapV1SubscriberInV2Interface: # for_v1 + """I wrap a RemoteReference that points at an old v1 subscriber, enabling + it to be treated like a v2 subscriber. + """ + + def __init__(self, original): + self.original = original + def __eq__(self, them): + return self.original == them + def __ne__(self, them): + return self.original != them + def __hash__(self): + return hash(self.original) + def getRemoteTubID(self): + return self.original.getRemoteTubID() + def getSturdyRef(self): + return self.original.getSturdyRef() + def getPeer(self): + return self.original.getPeer() + def callRemote(self, methname, *args, **kwargs): + m = getattr(self, "wrap_" + methname) + return m(*args, **kwargs) + def wrap_announce_v2(self, announcements): + anns_v1 = [convert_announcement_v2_to_v1(ann) for ann in announcements] + return self.original.callRemote("announce", set(anns_v1)) + def wrap_set_encoding_parameters(self, parameters): + # note: unused + return self.original.callRemote("set_encoding_parameters", parameters) + def notifyOnDisconnect(self, *args, **kwargs): + return self.original.notifyOnDisconnect(*args, **kwargs) + class IntroducerService(service.MultiService, Referenceable): - implements(RIIntroducerPublisherAndSubscriberService) + implements(RIIntroducerPublisherAndSubscriberService_v2) name = "introducer" - VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": - { }, + # v1 is the original protocol, supported since 1.0 (but only advertised + # starting in 1.3). v2 is the new signed protocol, supported after 1.9 + VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": { }, + "http://allmydata.org/tahoe/protocols/introducer/v2": { }, "application-version": str(allmydata.__full_version__), } def __init__(self, basedir="."): service.MultiService.__init__(self) self.introducer_url = None - # 'index' is (service_name, tubid) - self._announcements = {} # dict of index -> (announcement, timestamp) - self._subscribers = {} # dict of (rref->timestamp) dicts + # 'index' is (service_name, key_s, tubid), where key_s or tubid is + # None + self._announcements = {} # dict of index -> + # (ann_t, canary, ann, timestamp) + + # ann (the announcement dictionary) is cleaned up: nickname is always + # unicode, servicename is always ascii, etc, even though + # simplejson.loads sometimes returns either + + # self._subscribers is a dict mapping servicename to subscriptions + # 'subscriptions' is a dict mapping rref to a subscription + # 'subscription' is a tuple of (subscriber_info, timestamp) + # 'subscriber_info' is a dict, provided directly for v2 clients, or + # synthesized for v1 clients. The expected keys are: + # version, nickname, app-versions, my-version, oldest-supported + self._subscribers = {} + + # self._stub_client_announcements contains the information provided + # by v1 clients. We stash this so we can match it up with their + # subscriptions. + self._stub_client_announcements = {} # maps tubid to sinfo # for_v1 + self._debug_counts = {"inbound_message": 0, "inbound_duplicate": 0, "inbound_update": 0, "outbound_message": 0, "outbound_announcements": 0, "inbound_subscribe": 0} + self._debug_outstanding = 0 # also covers WrapV1SubscriberInV2Interface + + def _debug_retired(self, res): + self._debug_outstanding -= 1 + return res def log(self, *args, **kwargs): if "facility" not in kwargs: - kwargs["facility"] = "tahoe.introducer" + kwargs["facility"] = "tahoe.introducer.server" return log.msg(*args, **kwargs) def get_announcements(self): return self._announcements def get_subscribers(self): - return self._subscribers + """Return a list of (service_name, when, subscriber_info, rref) for + all subscribers. subscriber_info is a dict with the following keys: + version, nickname, app-versions, my-version, oldest-supported""" + s = [] + for service_name, subscriptions in self._subscribers.items(): + for rref,(subscriber_info,when) in subscriptions.items(): + s.append( (service_name, when, subscriber_info, rref) ) + return s def remote_get_version(self): return self.VERSION - def remote_publish(self, announcement): + def remote_publish(self, ann_t): # for_v1 + lp = self.log("introducer: old (v1) announcement published: %s" + % (ann_t,), umid="6zGOIw") + ann_v2 = convert_announcement_v1_to_v2(ann_t) + return self.publish(ann_v2, None, lp) + + def remote_publish_v2(self, ann_t, canary): + lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ") + return self.publish(ann_t, canary, lp) + + def publish(self, ann_t, canary, lp): try: - self._publish(announcement) + self._publish(ann_t, canary, lp) except: log.err(format="Introducer.remote_publish failed on %(ann)s", - ann=announcement, level=log.UNUSUAL, umid="620rWA") + ann=ann_t, + level=log.UNUSUAL, parent=lp, umid="620rWA") raise - def _publish(self, announcement): + def _publish(self, ann_t, canary, lp): self._debug_counts["inbound_message"] += 1 - self.log("introducer: announcement published: %s" % (announcement,) ) - (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement + self.log("introducer: announcement published: %s" % (ann_t,), + umid="wKHgCw") + ann, key = unsign_from_foolscap(ann_t) # might raise BadSignatureError + index = make_index(ann, key) - nodeid = b32decode(SturdyRef(furl).tubID.upper()) - index = (service_name, nodeid) + service_name = str(ann["service-name"]) + if service_name == "stub_client": # for_v1 + self._attach_stub_client(ann, lp) + return - if index in self._announcements: - (old_announcement, timestamp) = self._announcements[index] - if old_announcement == announcement: - self.log("but we already knew it, ignoring", level=log.NOISY) + old = self._announcements.get(index) + if old: + (old_ann_t, canary, old_ann, timestamp) = old + if old_ann == ann: + self.log("but we already knew it, ignoring", level=log.NOISY, + umid="myxzLw") self._debug_counts["inbound_duplicate"] += 1 return else: - self.log("old announcement being updated", level=log.NOISY) + self.log("old announcement being updated", level=log.NOISY, + umid="304r9g") self._debug_counts["inbound_update"] += 1 - self._announcements[index] = (announcement, time.time()) + self._announcements[index] = (ann_t, canary, ann, time.time()) + #if canary: + # canary.notifyOnDisconnect ... + # use a CanaryWatcher? with cw.is_connected()? + # actually we just want foolscap to give rref.is_connected(), since + # this is only for the status display for s in self._subscribers.get(service_name, []): self._debug_counts["outbound_message"] += 1 self._debug_counts["outbound_announcements"] += 1 - d = s.callRemote("announce", set([announcement])) - d.addErrback(rrefutil.trap_deadref) + self._debug_outstanding += 1 + d = s.callRemote("announce_v2", set([ann_t])) + d.addBoth(self._debug_retired) d.addErrback(log.err, format="subscriber errored on announcement %(ann)s", - ann=announcement, facility="tahoe.introducer", + ann=ann_t, facility="tahoe.introducer", level=log.UNUSUAL, umid="jfGMXQ") - def remote_subscribe(self, subscriber, service_name): - self.log("introducer: subscription[%s] request at %s" % (service_name, - subscriber)) + def _attach_stub_client(self, ann, lp): + # There might be a v1 subscriber for whom this is a stub_client. + # We might have received the subscription before the stub_client + # announcement, in which case we now need to fix up the record in + # self._subscriptions . + + # record it for later, in case the stub_client arrived before the + # subscription + subscriber_info = self._get_subscriber_info_from_ann(ann) + ann_tubid = get_tubid_string_from_ann(ann) + self._stub_client_announcements[ann_tubid] = subscriber_info + + lp2 = self.log("stub_client announcement, " + "looking for matching subscriber", + parent=lp, level=log.NOISY, umid="BTywDg") + + for sn in self._subscribers: + s = self._subscribers[sn] + for (subscriber, info) in s.items(): + # we correlate these by looking for a subscriber whose tubid + # matches this announcement + sub_tubid = subscriber.getRemoteTubID() + if sub_tubid == ann_tubid: + self.log(format="found a match, nodeid=%(nodeid)s", + nodeid=sub_tubid, + level=log.NOISY, parent=lp2, umid="xsWs1A") + # found a match. Does it need info? + if not info[0]: + self.log(format="replacing info", + level=log.NOISY, parent=lp2, umid="m5kxwA") + # yup + s[subscriber] = (subscriber_info, info[1]) + # and we don't remember or announce stub_clients beyond what we + # need to get the subscriber_info set up + + def _get_subscriber_info_from_ann(self, ann): # for_v1 + sinfo = { "version": ann["version"], + "nickname": ann["nickname"], + "app-versions": ann["app-versions"], + "my-version": ann["my-version"], + "oldest-supported": ann["oldest-supported"], + } + return sinfo + + def remote_subscribe(self, subscriber, service_name): # for_v1 + self.log("introducer: old (v1) subscription[%s] request at %s" + % (service_name, subscriber), umid="hJlGUg") + return self.add_subscriber(WrapV1SubscriberInV2Interface(subscriber), + service_name, None) + + def remote_subscribe_v2(self, subscriber, service_name, subscriber_info): + self.log("introducer: subscription[%s] request at %s" + % (service_name, subscriber), umid="U3uzLg") + return self.add_subscriber(subscriber, service_name, subscriber_info) + + def add_subscriber(self, subscriber, service_name, subscriber_info): self._debug_counts["inbound_subscribe"] += 1 if service_name not in self._subscribers: self._subscribers[service_name] = {} subscribers = self._subscribers[service_name] if subscriber in subscribers: self.log("but they're already subscribed, ignoring", - level=log.UNUSUAL) + level=log.UNUSUAL, umid="Sy9EfA") return - subscribers[subscriber] = time.time() + + if not subscriber_info: # for_v1 + # v1 clients don't provide subscriber_info, but they should + # publish a 'stub client' record which contains the same + # information. If we've already received this, it will be in + # self._stub_client_announcements + tubid = subscriber.getRemoteTubID() + if tubid in self._stub_client_announcements: + subscriber_info = self._stub_client_announcements[tubid] + + subscribers[subscriber] = (subscriber_info, time.time()) def _remove(): self.log("introducer: unsubscribing[%s] %s" % (service_name, - subscriber)) + subscriber), + umid="vYGcJg") subscribers.pop(subscriber, None) subscriber.notifyOnDisconnect(_remove) - announcements = set( - [ ann - for (sn2,nodeid),(ann,when) in self._announcements.items() - if sn2 == service_name] ) - - self._debug_counts["outbound_message"] += 1 - self._debug_counts["outbound_announcements"] += len(announcements) - d = subscriber.callRemote("announce", announcements) - d.addErrback(rrefutil.trap_deadref) - d.addErrback(log.err, - format="subscriber errored during subscribe %(anns)s", - anns=announcements, facility="tahoe.introducer", - level=log.UNUSUAL, umid="mtZepQ") + # now tell them about any announcements they're interested in + announcements = set( [ ann_t + for idx,(ann_t,canary,ann,when) + in self._announcements.items() + if idx[0] == service_name] ) + if announcements: + self._debug_counts["outbound_message"] += 1 + self._debug_counts["outbound_announcements"] += len(announcements) + self._debug_outstanding += 1 + d = subscriber.callRemote("announce_v2", announcements) + d.addBoth(self._debug_retired) + d.addErrback(log.err, + format="subscriber errored during subscribe %(anns)s", + anns=announcements, facility="tahoe.introducer", + level=log.UNUSUAL, umid="mtZepQ") + return d diff --git a/src/allmydata/node.py b/src/allmydata/node.py index 0b843650..06b707de 100644 --- a/src/allmydata/node.py +++ b/src/allmydata/node.py @@ -195,6 +195,19 @@ class Node(service.MultiService): # TODO: merge this with allmydata.get_package_versions return dict(app_versions.versions) + def get_config_from_file(self, name, required=False): + """Get the (string) contents of a config file, or None if the file + did not exist. If required=True, raise an exception rather than + returning None. Any leading or trailing whitespace will be stripped + from the data.""" + fn = os.path.join(self.basedir, name) + try: + return fileutil.read(fn).strip() + except EnvironmentError: + if not required: + return None + raise + def write_private_config(self, name, value): """Write the (string) contents of a private config file (which is a config file that resides within the subdirectory named 'private'), and diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index 1f39c9ca..c5e5b392 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -99,6 +99,11 @@ class StorageServer(service.MultiService, Referenceable): def __repr__(self): return "" % (idlib.shortnodeid_b2a(self.my_nodeid),) + def have_shares(self): + # quick test to decide if we need to commit to an implicit + # permutation-seed or if we should use a new one + return bool(set(os.listdir(self.sharedir)) - set(["incoming"])) + def add_bucket_counter(self): statefile = os.path.join(self.storedir, "bucket_counter.state") self.bucket_counter = BucketCountingCrawler(self, statefile) diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index 7a8ce0c4..9d36a8bd 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -29,11 +29,11 @@ the foolscap-based server implemented in src/allmydata/storage/*.py . # 6: implement other sorts of IStorageClient classes: S3, etc -import time +import re, time from zope.interface import implements, Interface from foolscap.api import eventually from allmydata.interfaces import IStorageBroker -from allmydata.util import idlib, log +from allmydata.util import log, base32 from allmydata.util.assertutil import precondition from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.util.hashutil import sha1 @@ -74,8 +74,8 @@ class StorageFarmBroker: self.introducer_client = None # these two are used in unit tests - def test_add_rref(self, serverid, rref): - s = NativeStorageServer(serverid, {}) + def test_add_rref(self, serverid, rref, ann): + s = NativeStorageServer(serverid, ann.copy()) s.rref = rref self.servers[serverid] = s @@ -86,21 +86,23 @@ class StorageFarmBroker: self.introducer_client = ic = introducer_client ic.subscribe_to("storage", self._got_announcement) - def _got_announcement(self, serverid, ann_d): - precondition(isinstance(serverid, str), serverid) - precondition(len(serverid) == 20, serverid) - assert ann_d["service-name"] == "storage" + def _got_announcement(self, key_s, ann): + if key_s is not None: + precondition(isinstance(key_s, str), key_s) + precondition(key_s.startswith("v0-"), key_s) + assert ann["service-name"] == "storage" + s = NativeStorageServer(key_s, ann) + serverid = s.get_serverid() old = self.servers.get(serverid) if old: - if old.get_announcement() == ann_d: + if old.get_announcement() == ann: return # duplicate # replacement del self.servers[serverid] old.stop_connecting() # now we forget about them and start using the new one - dsc = NativeStorageServer(serverid, ann_d) - self.servers[serverid] = dsc - dsc.start_connecting(self.tub, self._trigger_connections) + self.servers[serverid] = s + s.start_connecting(self.tub, self._trigger_connections) # the descriptor will manage their own Reconnector, and each time we # need servers, we'll ask them if they're connected or not. @@ -173,13 +175,25 @@ class NativeStorageServer: "application-version": "unknown: no get_version()", } - def __init__(self, serverid, ann_d, min_shares=1): - self.serverid = serverid - self._tubid = serverid - self.announcement = ann_d + def __init__(self, key_s, ann, min_shares=1): + self.key_s = key_s + self.announcement = ann self.min_shares = min_shares - self.serverid_s = idlib.shortnodeid_b2a(self.serverid) + assert "anonymous-storage-FURL" in ann, ann + furl = str(ann["anonymous-storage-FURL"]) + m = re.match(r'pb://(\w+)@', furl) + assert m, furl + tubid_s = m.group(1).lower() + self._tubid = base32.a2b(tubid_s) + assert "permutation-seed-base32" in ann, ann + ps = base32.a2b(str(ann["permutation-seed-base32"])) + self._permutation_seed = ps + + name = key_s or tubid_s + self._long_description = name + self._short_description = name[:8] # TODO: decide who adds [] + self.announcement_time = time.time() self.last_connect_time = None self.last_loss_time = None @@ -191,17 +205,17 @@ class NativeStorageServer: def __repr__(self): return "" % self.get_name() def get_serverid(self): - return self._tubid + return self._tubid # XXX replace with self.key_s def get_permutation_seed(self): - return self._tubid + return self._permutation_seed def get_version(self): if self.rref: return self.rref.version return None def get_name(self): # keep methodname short - return self.serverid_s + return self._short_description def get_longname(self): - return idlib.nodeid_b2a(self._tubid) + return self._long_description def get_lease_seed(self): return self._tubid def get_foolscap_write_enabler_seed(self): @@ -221,7 +235,7 @@ class NativeStorageServer: return self.announcement_time def start_connecting(self, tub, trigger_cb): - furl = self.announcement["FURL"] + furl = str(self.announcement["anonymous-storage-FURL"]) self._trigger_cb = trigger_cb self._reconnector = tub.connectTo(furl, self._got_connection) diff --git a/src/allmydata/test/test_checker.py b/src/allmydata/test/test_checker.py index 6ca51139..7ca19f8a 100644 --- a/src/allmydata/test/test_checker.py +++ b/src/allmydata/test/test_checker.py @@ -22,15 +22,16 @@ class WebResultsRendering(unittest.TestCase, WebRenderingMixin): for (peerid, nickname) in [("\x00"*20, "peer-0"), ("\xff"*20, "peer-f"), ("\x11"*20, "peer-11")] : - ann_d = { "version": 0, - "service-name": "storage", - "FURL": "fake furl", - "nickname": unicode(nickname), - "app-versions": {}, # need #466 and v2 introducer - "my-version": "ver", - "oldest-supported": "oldest", - } - s = NativeStorageServer(peerid, ann_d) + ann = { "version": 0, + "service-name": "storage", + "anonymous-storage-FURL": "pb://abcde@nowhere/fake", + "permutation-seed-base32": "", + "nickname": unicode(nickname), + "app-versions": {}, # need #466 and v2 introducer + "my-version": "ver", + "oldest-supported": "oldest", + } + s = NativeStorageServer(peerid, ann) sb.test_add_server(peerid, s) c = FakeClient() c.storage_broker = sb diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py index 14c9c791..53734648 100644 --- a/src/allmydata/test/test_client.py +++ b/src/allmydata/test/test_client.py @@ -136,12 +136,14 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase): self.failUnlessEqual(c.getServiceNamed("storage").reserved_space, 0) def _permute(self, sb, key): - return [ s.get_serverid() for s in sb.get_servers_for_psi(key) ] + return [ s.get_longname() for s in sb.get_servers_for_psi(key) ] def test_permute(self): sb = StorageFarmBroker(None, True) for k in ["%d" % i for i in range(5)]: - sb.test_add_rref(k, "rref") + ann = {"anonymous-storage-FURL": "pb://abcde@nowhere/fake", + "permutation-seed-base32": base32.b2a(k) } + sb.test_add_rref(k, "rref", ann) self.failUnlessReallyEqual(self._permute(sb, "one"), ['3','1','0','4','2']) self.failUnlessReallyEqual(self._permute(sb, "two"), ['0','4','2','1','3']) diff --git a/src/allmydata/test/test_introducer.py b/src/allmydata/test/test_introducer.py index 9d0f50eb..b8dd50b3 100644 --- a/src/allmydata/test/test_introducer.py +++ b/src/allmydata/test/test_introducer.py @@ -1,6 +1,7 @@ import os, re from base64 import b32decode +import simplejson from twisted.trial import unittest from twisted.internet import defer @@ -9,11 +10,16 @@ from twisted.python import log from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue from twisted.application import service from allmydata.interfaces import InsufficientVersionError -from allmydata.introducer.client import IntroducerClient +from allmydata.introducer.client import IntroducerClient, \ + WrapV2ClientInV1Interface from allmydata.introducer.server import IntroducerService +from allmydata.introducer.common import get_tubid_string_from_ann, \ + get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \ + UnknownKeyError +from allmydata.introducer import old # test compatibility with old introducer .tac files from allmydata.introducer import IntroducerNode -from allmydata.util import pollmixin +from allmydata.util import pollmixin, keyutil import allmydata.test.common_util as testutil class LoggingMultiService(service.MultiService): @@ -47,14 +53,14 @@ class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin): def test_create(self): ic = IntroducerClient(None, "introducer.furl", u"my_nickname", - "my_version", "oldest_version") + "my_version", "oldest_version", {}) self.failUnless(isinstance(ic, IntroducerClient)) def test_listen(self): i = IntroducerService() i.setServiceParent(self.parent) - def test_duplicate(self): + def test_duplicate_publish(self): i = IntroducerService() self.failUnlessEqual(len(i.get_announcements()), 0) self.failUnlessEqual(len(i.get_subscribers()), 0) @@ -73,6 +79,223 @@ class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin): self.failUnlessEqual(len(i.get_announcements()), 2) self.failUnlessEqual(len(i.get_subscribers()), 0) + def test_id_collision(self): + # test replacement case where tubid equals a keyid (one should + # not replace the other) + i = IntroducerService() + ic = IntroducerClient(None, + "introducer.furl", u"my_nickname", + "my_version", "oldest_version", {}) + sk_s, vk_s = keyutil.make_keypair() + sk, _ignored = keyutil.parse_privkey(sk_s) + keyid = keyutil.remove_prefix(vk_s, "pub-v0-") + furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short") + ann_t = ic.create_announcement("storage", make_ann(furl1), sk) + i.remote_publish_v2(ann_t, Referenceable()) + announcements = i.get_announcements() + self.failUnlessEqual(len(announcements), 1) + key1 = ("storage", "v0-"+keyid, None) + self.failUnless(key1 in announcements) + (ign, ign, ann1_out, ign) = announcements[key1] + self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1) + + furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid + ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0") + i.remote_publish(ann2) + self.failUnlessEqual(len(announcements), 2) + key2 = ("storage", None, keyid) + self.failUnless(key2 in announcements) + (ign, ign, ann2_out, ign) = announcements[key2] + self.failUnlessEqual(ann2_out["anonymous-storage-FURL"], furl2) + + +def make_ann(furl): + ann = { "anonymous-storage-FURL": furl, + "permutation-seed-base32": get_tubid_string(furl) } + return ann + +def make_ann_t(ic, furl, privkey): + return ic.create_announcement("storage", make_ann(furl), privkey) + +class Client(unittest.TestCase): + def test_duplicate_receive_v1(self): + ic = IntroducerClient(None, + "introducer.furl", u"my_nickname", + "my_version", "oldest_version", {}) + announcements = [] + ic.subscribe_to("storage", + lambda key_s,ann: announcements.append(ann)) + furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra" + ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0") + ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0") + ca = WrapV2ClientInV1Interface(ic) + + ca.remote_announce([ann1]) + d = fireEventually() + def _then(ign): + self.failUnlessEqual(len(announcements), 1) + self.failUnlessEqual(announcements[0]["nickname"], u"nick1") + self.failUnlessEqual(announcements[0]["my-version"], "ver23") + self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1) + self.failUnlessEqual(ic._debug_counts["new_announcement"], 1) + self.failUnlessEqual(ic._debug_counts["update"], 0) + self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0) + # now send a duplicate announcement: this should not notify clients + ca.remote_announce([ann1]) + return fireEventually() + d.addCallback(_then) + def _then2(ign): + self.failUnlessEqual(len(announcements), 1) + self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2) + self.failUnlessEqual(ic._debug_counts["new_announcement"], 1) + self.failUnlessEqual(ic._debug_counts["update"], 0) + self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1) + # and a replacement announcement: same FURL, new other stuff. + # Clients should be notified. + ca.remote_announce([ann1b]) + return fireEventually() + d.addCallback(_then2) + def _then3(ign): + self.failUnlessEqual(len(announcements), 2) + self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3) + self.failUnlessEqual(ic._debug_counts["new_announcement"], 1) + self.failUnlessEqual(ic._debug_counts["update"], 1) + self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1) + # test that the other stuff changed + self.failUnlessEqual(announcements[-1]["nickname"], u"nick1") + self.failUnlessEqual(announcements[-1]["my-version"], "ver24") + d.addCallback(_then3) + return d + + def test_duplicate_receive_v2(self): + ic1 = IntroducerClient(None, + "introducer.furl", u"my_nickname", + "ver23", "oldest_version", {}) + # we use a second client just to create a different-looking + # announcement + ic2 = IntroducerClient(None, + "introducer.furl", u"my_nickname", + "ver24","oldest_version",{}) + announcements = [] + def _received(key_s, ann): + announcements.append( (key_s, ann) ) + ic1.subscribe_to("storage", _received) + furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp" + furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp" + furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo" + + privkey_s, pubkey_vs = keyutil.make_keypair() + privkey, _ignored = keyutil.parse_privkey(privkey_s) + pubkey_s = keyutil.remove_prefix(pubkey_vs, "pub-") + + # ann1: ic1, furl1 + # ann1a: ic1, furl1a (same SturdyRef, different connection hints) + # ann1b: ic2, furl1 + # ann2: ic2, furl2 + + self.ann1 = make_ann_t(ic1, furl1, privkey) + self.ann1a = make_ann_t(ic1, furl1a, privkey) + self.ann1b = make_ann_t(ic2, furl1, privkey) + self.ann2 = make_ann_t(ic2, furl2, privkey) + + ic1.remote_announce_v2([self.ann1]) # queues eventual-send + d = fireEventually() + def _then1(ign): + self.failUnlessEqual(len(announcements), 1) + key_s,ann = announcements[0] + self.failUnlessEqual(key_s, pubkey_s) + self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1) + self.failUnlessEqual(ann["my-version"], "ver23") + d.addCallback(_then1) + + # now send a duplicate announcement. This should not fire the + # subscriber + d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1])) + d.addCallback(fireEventually) + def _then2(ign): + self.failUnlessEqual(len(announcements), 1) + d.addCallback(_then2) + + # and a replacement announcement: same FURL, new other stuff. The + # subscriber *should* be fired. + d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b])) + d.addCallback(fireEventually) + def _then3(ign): + self.failUnlessEqual(len(announcements), 2) + key_s,ann = announcements[-1] + self.failUnlessEqual(key_s, pubkey_s) + self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1) + self.failUnlessEqual(ann["my-version"], "ver24") + d.addCallback(_then3) + + # and a replacement announcement with a different FURL (it uses + # different connection hints) + d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a])) + d.addCallback(fireEventually) + def _then4(ign): + self.failUnlessEqual(len(announcements), 3) + key_s,ann = announcements[-1] + self.failUnlessEqual(key_s, pubkey_s) + self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a) + self.failUnlessEqual(ann["my-version"], "ver23") + d.addCallback(_then4) + + # now add a new subscription, which should be called with the + # backlog. The introducer only records one announcement per index, so + # the backlog will only have the latest message. + announcements2 = [] + def _received2(key_s, ann): + announcements2.append( (key_s, ann) ) + d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2)) + d.addCallback(fireEventually) + def _then5(ign): + self.failUnlessEqual(len(announcements2), 1) + key_s,ann = announcements2[-1] + self.failUnlessEqual(key_s, pubkey_s) + self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1a) + self.failUnlessEqual(ann["my-version"], "ver23") + d.addCallback(_then5) + return d + + def test_id_collision(self): + # test replacement case where tubid equals a keyid (one should + # not replace the other) + ic = IntroducerClient(None, + "introducer.furl", u"my_nickname", + "my_version", "oldest_version", {}) + announcements = [] + ic.subscribe_to("storage", + lambda key_s,ann: announcements.append(ann)) + sk_s, vk_s = keyutil.make_keypair() + sk, _ignored = keyutil.parse_privkey(sk_s) + keyid = keyutil.remove_prefix(vk_s, "pub-v0-") + furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short") + furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid + ann_t = ic.create_announcement("storage", make_ann(furl1), sk) + ic.remote_announce_v2([ann_t]) + d = fireEventually() + def _then(ign): + # first announcement has been processed + self.failUnlessEqual(len(announcements), 1) + self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"], + furl1) + # now submit a second one, with a tubid that happens to look just + # like the pubkey-based serverid we just processed. They should + # not overlap. + ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0") + ca = WrapV2ClientInV1Interface(ic) + ca.remote_announce([ann2]) + return fireEventually() + d.addCallback(_then) + def _then2(ign): + # if they overlapped, the second announcement would be ignored + self.failUnlessEqual(len(announcements), 2) + self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"], + furl2) + d.addCallback(_then2) + return d + + class SystemTestMixin(ServiceMixin, pollmixin.PollMixin): def create_tub(self, portnum=0): @@ -88,36 +311,86 @@ class SystemTestMixin(ServiceMixin, pollmixin.PollMixin): assert self.central_portnum == portnum tub.setLocation("localhost:%d" % self.central_portnum) -class SystemTest(SystemTestMixin, unittest.TestCase): - - def test_system(self): - self.basedir = "introducer/SystemTest/system" +class Queue(SystemTestMixin, unittest.TestCase): + def test_queue_until_connected(self): + self.basedir = "introducer/QueueUntilConnected/queued" os.makedirs(self.basedir) - return self.do_system_test(IntroducerService) - test_system.timeout = 480 # occasionally takes longer than 350s on "draco" + self.create_tub() + introducer = IntroducerService() + introducer.setServiceParent(self.parent) + iff = os.path.join(self.basedir, "introducer.furl") + ifurl = self.central_tub.registerReference(introducer, furlFile=iff) + tub2 = Tub() + tub2.setServiceParent(self.parent) + c = IntroducerClient(tub2, ifurl, + u"nickname", "version", "oldest", {}) + furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short") + sk_s, vk_s = keyutil.make_keypair() + sk, _ignored = keyutil.parse_privkey(sk_s) + + d = introducer.disownServiceParent() + def _offline(ign): + # now that the introducer server is offline, create a client and + # publish some messages + c.setServiceParent(self.parent) # this starts the reconnector + c.publish("storage", make_ann(furl1), sk) + + introducer.setServiceParent(self.parent) # restart the server + # now wait for the messages to be delivered + def _got_announcement(): + return bool(introducer.get_announcements()) + return self.poll(_got_announcement) + d.addCallback(_offline) + def _done(ign): + v = list(introducer.get_announcements().values())[0] + (ign, ign, ann1_out, ign) = v + self.failUnlessEqual(ann1_out["anonymous-storage-FURL"], furl1) + d.addCallback(_done) + + # now let the ack get back + def _wait_until_idle(ign): + def _idle(): + if c._debug_outstanding: + return False + if introducer._debug_outstanding: + return False + return True + return self.poll(_idle) + d.addCallback(_wait_until_idle) + return d - def do_system_test(self, create_introducer): + +V1 = "v1"; V2 = "v2" +class SystemTest(SystemTestMixin, unittest.TestCase): + + def do_system_test(self, server_version): self.create_tub() - introducer = create_introducer() + if server_version == V1: + introducer = old.IntroducerService_v1() + else: + introducer = IntroducerService() introducer.setServiceParent(self.parent) iff = os.path.join(self.basedir, "introducer.furl") tub = self.central_tub ifurl = self.central_tub.registerReference(introducer, furlFile=iff) self.introducer_furl = ifurl - NUMCLIENTS = 5 - # we have 5 clients who publish themselves, and an extra one does - # which not. When the connections are fully established, all six nodes + # we have 5 clients who publish themselves as storage servers, and a + # sixth which does which not. All 6 clients subscriber to hear about + # storage. When the connections are fully established, all six nodes # should have 5 connections each. + NUM_STORAGE = 5 + NUM_CLIENTS = 6 clients = [] tubs = {} received_announcements = {} - NUM_SERVERS = NUMCLIENTS subscribing_clients = [] publishing_clients = [] + privkeys = {} + expected_announcements = [0 for c in range(NUM_CLIENTS)] - for i in range(NUMCLIENTS+1): + for i in range(NUM_CLIENTS): tub = Tub() #tub.setOption("logLocalFailures", True) #tub.setOption("logRemoteFailures", True) @@ -128,62 +401,171 @@ class SystemTest(SystemTestMixin, unittest.TestCase): tub.setLocation("localhost:%d" % portnum) log.msg("creating client %d: %s" % (i, tub.getShortTubID())) - c = IntroducerClient(tub, self.introducer_furl, u"nickname-%d" % i, - "version", "oldest") + if i == 0: + c = old.IntroducerClient_v1(tub, self.introducer_furl, + u"nickname-%d" % i, + "version", "oldest") + else: + c = IntroducerClient(tub, self.introducer_furl, + u"nickname-%d" % i, + "version", "oldest", + {"component": "component-v1"}) received_announcements[c] = {} - def got(serverid, ann_d, announcements): - announcements[serverid] = ann_d - c.subscribe_to("storage", got, received_announcements[c]) + def got(key_s_or_tubid, ann, announcements, i): + if i == 0: + index = get_tubid_string_from_ann(ann) + else: + index = key_s_or_tubid or get_tubid_string_from_ann(ann) + announcements[index] = ann + c.subscribe_to("storage", got, received_announcements[c], i) subscribing_clients.append(c) - - if i < NUMCLIENTS: - node_furl = tub.registerReference(Referenceable()) - c.publish(node_furl, "storage", "ri_name") + expected_announcements[i] += 1 # all expect a 'storage' announcement + + node_furl = tub.registerReference(Referenceable()) + if i < NUM_STORAGE: + if i == 0: + c.publish(node_furl, "storage", "ri_name") + elif i == 1: + # sign the announcement + privkey_s, pubkey_s = keyutil.make_keypair() + privkey, _ignored = keyutil.parse_privkey(privkey_s) + privkeys[c] = privkey + c.publish("storage", make_ann(node_furl), privkey) + else: + c.publish("storage", make_ann(node_furl)) publishing_clients.append(c) - # the last one does not publish anything + else: + # the last one does not publish anything + pass + + if i == 0: + # users of the V1 client were required to publish a + # 'stub_client' record (somewhat after they published the + # 'storage' record), so the introducer could see their + # version. Match that behavior. + c.publish(node_furl, "stub_client", "stub_ri_name") + + if i == 2: + # also publish something that nobody cares about + boring_furl = tub.registerReference(Referenceable()) + c.publish("boring", make_ann(boring_furl)) c.setServiceParent(self.parent) clients.append(c) tubs[c] = tub - def _wait_for_all_connections(): - for c in subscribing_clients: - if len(received_announcements[c]) < NUM_SERVERS: + + def _wait_for_connected(ign): + def _connected(): + for c in clients: + if not c.connected_to_introducer(): + return False + return True + return self.poll(_connected) + + # we watch the clients to determine when the system has settled down. + # Then we can look inside the server to assert things about its + # state. + + def _wait_for_expected_announcements(ign): + def _got_expected_announcements(): + for i,c in enumerate(subscribing_clients): + if len(received_announcements[c]) < expected_announcements[i]: + return False + return True + return self.poll(_got_expected_announcements) + + # before shutting down any Tub, we'd like to know that there are no + # messages outstanding + + def _wait_until_idle(ign): + def _idle(): + for c in subscribing_clients + publishing_clients: + if c._debug_outstanding: + return False + if introducer._debug_outstanding: return False - return True - d = self.poll(_wait_for_all_connections) + return True + return self.poll(_idle) + + d = defer.succeed(None) + d.addCallback(_wait_for_connected) + d.addCallback(_wait_for_expected_announcements) + d.addCallback(_wait_until_idle) def _check1(res): log.msg("doing _check1") dc = introducer._debug_counts - self.failUnlessEqual(dc["inbound_message"], NUM_SERVERS) - self.failUnlessEqual(dc["inbound_duplicate"], 0) + if server_version == V1: + # each storage server publishes a record, and (after its + # 'subscribe' has been ACKed) also publishes a "stub_client". + # The non-storage client (which subscribes) also publishes a + # stub_client. There is also one "boring" service. The number + # of messages is higher, because the stub_clients aren't + # published until after we get the 'subscribe' ack (since we + # don't realize that we're dealing with a v1 server [which + # needs stub_clients] until then), and the act of publishing + # the stub_client causes us to re-send all previous + # announcements. + self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"], + NUM_STORAGE + NUM_CLIENTS + 1) + else: + # each storage server publishes a record. There is also one + # "stub_client" and one "boring" + self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2) + self.failUnlessEqual(dc["inbound_duplicate"], 0) self.failUnlessEqual(dc["inbound_update"], 0) - self.failUnless(dc["outbound_message"]) + self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS) + # the number of outbound messages is tricky.. I think it depends + # upon a race between the publish and the subscribe messages. + self.failUnless(dc["outbound_message"] > 0) + # each client subscribes to "storage", and each server publishes + self.failUnlessEqual(dc["outbound_announcements"], + NUM_STORAGE*NUM_CLIENTS) - for c in clients: - self.failUnless(c.connected_to_introducer()) for c in subscribing_clients: cdc = c._debug_counts self.failUnless(cdc["inbound_message"]) self.failUnlessEqual(cdc["inbound_announcement"], - NUM_SERVERS) + NUM_STORAGE) self.failUnlessEqual(cdc["wrong_service"], 0) self.failUnlessEqual(cdc["duplicate_announcement"], 0) self.failUnlessEqual(cdc["update"], 0) self.failUnlessEqual(cdc["new_announcement"], - NUM_SERVERS) + NUM_STORAGE) anns = received_announcements[c] - self.failUnlessEqual(len(anns), NUM_SERVERS) + self.failUnlessEqual(len(anns), NUM_STORAGE) - nodeid0 = b32decode(tubs[clients[0]].tubID.upper()) - ann_d = anns[nodeid0] - nick = ann_d["nickname"] + nodeid0 = tubs[clients[0]].tubID + ann = anns[nodeid0] + nick = ann["nickname"] self.failUnlessEqual(type(nick), unicode) self.failUnlessEqual(nick, u"nickname-0") - for c in publishing_clients: - cdc = c._debug_counts - self.failUnlessEqual(cdc["outbound_message"], 1) + if server_version == V1: + for c in publishing_clients: + cdc = c._debug_counts + expected = 1 # storage + if c is clients[2]: + expected += 1 # boring + if c is not clients[0]: + # the v2 client tries to call publish_v2, which fails + # because the server is v1. It then re-sends + # everything it has so far, plus a stub_client record + expected = 2*expected + 1 + if c is clients[0]: + # we always tell v1 client to send stub_client + expected += 1 + self.failUnlessEqual(cdc["outbound_message"], expected) + else: + for c in publishing_clients: + cdc = c._debug_counts + expected = 1 + if c in [clients[0], # stub_client + clients[2], # boring + ]: + expected = 2 + self.failUnlessEqual(cdc["outbound_message"], expected) + log.msg("_check1 done") d.addCallback(_check1) # force an introducer reconnect, by shutting down the Tub it's using @@ -196,67 +578,54 @@ class SystemTest(SystemTestMixin, unittest.TestCase): d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub")) d.addCallback(lambda _ign: self.central_tub.disownServiceParent()) - def _wait_for_introducer_loss(): - for c in clients: - if c.connected_to_introducer(): - return False - return True - d.addCallback(lambda res: self.poll(_wait_for_introducer_loss)) + def _wait_for_introducer_loss(ign): + def _introducer_lost(): + for c in clients: + if c.connected_to_introducer(): + return False + return True + return self.poll(_introducer_lost) + d.addCallback(_wait_for_introducer_loss) def _restart_introducer_tub(_ign): log.msg("restarting introducer's Tub") - - dc = introducer._debug_counts - self.expected_count = dc["inbound_message"] + NUM_SERVERS - self.expected_subscribe_count = dc["inbound_subscribe"] + NUMCLIENTS+1 - introducer._debug0 = dc["outbound_message"] - for c in subscribing_clients: - cdc = c._debug_counts - c._debug0 = cdc["inbound_message"] - + # reset counters + for i in range(NUM_CLIENTS): + c = subscribing_clients[i] + for k in c._debug_counts: + c._debug_counts[k] = 0 + for k in introducer._debug_counts: + introducer._debug_counts[k] = 0 + expected_announcements[i] += 1 # new 'storage' for everyone self.create_tub(self.central_portnum) newfurl = self.central_tub.registerReference(introducer, furlFile=iff) assert newfurl == self.introducer_furl d.addCallback(_restart_introducer_tub) - def _wait_for_introducer_reconnect(): - # wait until: - # all clients are connected - # the introducer has received publish messages from all of them - # the introducer has received subscribe messages from all of them - # the introducer has sent (duplicate) announcements to all of them - # all clients have received (duplicate) announcements - dc = introducer._debug_counts - for c in clients: - if not c.connected_to_introducer(): - return False - if dc["inbound_message"] < self.expected_count: - return False - if dc["inbound_subscribe"] < self.expected_subscribe_count: - return False - for c in subscribing_clients: - cdc = c._debug_counts - if cdc["inbound_message"] < c._debug0+1: - return False - return True - d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect)) + d.addCallback(_wait_for_connected) + d.addCallback(_wait_for_expected_announcements) + d.addCallback(_wait_until_idle) + d.addCallback(lambda _ign: log.msg(" reconnected")) + # TODO: publish something while the introducer is offline, then + # confirm it gets delivered when the connection is reestablished def _check2(res): log.msg("doing _check2") # assert that the introducer sent out new messages, one per # subscriber dc = introducer._debug_counts - self.failUnlessEqual(dc["inbound_message"], 2*NUM_SERVERS) - self.failUnlessEqual(dc["inbound_duplicate"], NUM_SERVERS) - self.failUnlessEqual(dc["inbound_update"], 0) - self.failUnlessEqual(dc["outbound_message"], - introducer._debug0 + len(subscribing_clients)) - for c in clients: - self.failUnless(c.connected_to_introducer()) + self.failUnlessEqual(dc["outbound_announcements"], + NUM_STORAGE*NUM_CLIENTS) + self.failUnless(dc["outbound_message"] > 0) + self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS) for c in subscribing_clients: cdc = c._debug_counts - self.failUnlessEqual(cdc["duplicate_announcement"], NUM_SERVERS) + self.failUnlessEqual(cdc["inbound_message"], 1) + self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE) + self.failUnlessEqual(cdc["new_announcement"], 0) + self.failUnlessEqual(cdc["wrong_service"], 0) + self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE) d.addCallback(_check2) # Then force an introducer restart, by shutting down the Tub, @@ -267,71 +636,211 @@ class SystemTest(SystemTestMixin, unittest.TestCase): d.addCallback(lambda _ign: log.msg("shutting down introducer")) d.addCallback(lambda _ign: self.central_tub.disownServiceParent()) - d.addCallback(lambda res: self.poll(_wait_for_introducer_loss)) + d.addCallback(_wait_for_introducer_loss) + d.addCallback(lambda _ign: log.msg("introducer lost")) def _restart_introducer(_ign): log.msg("restarting introducer") self.create_tub(self.central_portnum) - - for c in subscribing_clients: - # record some counters for later comparison. Stash the values - # on the client itself, because I'm lazy. - cdc = c._debug_counts - c._debug1 = cdc["inbound_announcement"] - c._debug2 = cdc["inbound_message"] - c._debug3 = cdc["new_announcement"] - newintroducer = create_introducer() - self.expected_message_count = NUM_SERVERS - self.expected_announcement_count = NUM_SERVERS*len(subscribing_clients) - self.expected_subscribe_count = len(subscribing_clients) - newfurl = self.central_tub.registerReference(newintroducer, + # reset counters + for i in range(NUM_CLIENTS): + c = subscribing_clients[i] + for k in c._debug_counts: + c._debug_counts[k] = 0 + expected_announcements[i] += 1 # new 'storage' for everyone + if server_version == V1: + introducer = old.IntroducerService_v1() + else: + introducer = IntroducerService() + newfurl = self.central_tub.registerReference(introducer, furlFile=iff) assert newfurl == self.introducer_furl d.addCallback(_restart_introducer) - def _wait_for_introducer_reconnect2(): - # wait until: - # all clients are connected - # the introducer has received publish messages from all of them - # the introducer has received subscribe messages from all of them - # the introducer has sent announcements for everybody to everybody - # all clients have received all the (duplicate) announcements - # at that point, the system should be quiescent - dc = introducer._debug_counts - for c in clients: - if not c.connected_to_introducer(): - return False - if dc["inbound_message"] < self.expected_message_count: - return False - if dc["outbound_announcements"] < self.expected_announcement_count: - return False - if dc["inbound_subscribe"] < self.expected_subscribe_count: - return False - for c in subscribing_clients: - cdc = c._debug_counts - if cdc["inbound_announcement"] < c._debug1+NUM_SERVERS: - return False - return True - d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect2)) + + d.addCallback(_wait_for_connected) + d.addCallback(_wait_for_expected_announcements) + d.addCallback(_wait_until_idle) def _check3(res): log.msg("doing _check3") - for c in clients: - self.failUnless(c.connected_to_introducer()) + dc = introducer._debug_counts + self.failUnlessEqual(dc["outbound_announcements"], + NUM_STORAGE*NUM_CLIENTS) + self.failUnless(dc["outbound_message"] > 0) + self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS) for c in subscribing_clients: cdc = c._debug_counts - self.failUnless(cdc["inbound_announcement"] > c._debug1) - self.failUnless(cdc["inbound_message"] > c._debug2) - # there should have been no new announcements - self.failUnlessEqual(cdc["new_announcement"], c._debug3) - # and the right number of duplicate ones. There were - # NUM_SERVERS from the servertub restart, and there should be - # another NUM_SERVERS now - self.failUnlessEqual(cdc["duplicate_announcement"], - 2*NUM_SERVERS) + self.failUnless(cdc["inbound_message"] > 0) + self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE) + self.failUnlessEqual(cdc["new_announcement"], 0) + self.failUnlessEqual(cdc["wrong_service"], 0) + self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE) d.addCallback(_check3) return d + + def test_system_v2_server(self): + self.basedir = "introducer/SystemTest/system_v2_server" + os.makedirs(self.basedir) + return self.do_system_test(V2) + test_system_v2_server.timeout = 480 + # occasionally takes longer than 350s on "draco" + + def test_system_v1_server(self): + self.basedir = "introducer/SystemTest/system_v1_server" + os.makedirs(self.basedir) + return self.do_system_test(V1) + test_system_v1_server.timeout = 480 + # occasionally takes longer than 350s on "draco" + +class FakeRemoteReference: + def notifyOnDisconnect(self, *args, **kwargs): pass + def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y" + +class ClientInfo(unittest.TestCase): + def test_client_v2(self): + introducer = IntroducerService() + tub = introducer_furl = None + app_versions = {"whizzy": "fizzy"} + client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2", + "my_version", "oldest", app_versions) + #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum" + #ann_s = make_ann_t(client_v2, furl1, None) + #introducer.remote_publish_v2(ann_s, Referenceable()) + subscriber = FakeRemoteReference() + introducer.remote_subscribe_v2(subscriber, "storage", + client_v2._my_subscriber_info) + s = introducer.get_subscribers() + self.failUnlessEqual(len(s), 1) + sn, when, si, rref = s[0] + self.failUnlessIdentical(rref, subscriber) + self.failUnlessEqual(sn, "storage") + self.failUnlessEqual(si["version"], 0) + self.failUnlessEqual(si["oldest-supported"], "oldest") + self.failUnlessEqual(si["app-versions"], app_versions) + self.failUnlessEqual(si["nickname"], u"nick-v2") + self.failUnlessEqual(si["my-version"], "my_version") + + def test_client_v1(self): + introducer = IntroducerService() + subscriber = FakeRemoteReference() + introducer.remote_subscribe(subscriber, "storage") + # the v1 subscribe interface had no subscriber_info: that was usually + # sent in a separate stub_client pseudo-announcement + s = introducer.get_subscribers() + self.failUnlessEqual(len(s), 1) + sn, when, si, rref = s[0] + # rref will be a WrapV1SubscriberInV2Interface around the real + # subscriber + self.failUnlessIdentical(rref.original, subscriber) + self.failUnlessEqual(si, None) # not known yet + self.failUnlessEqual(sn, "storage") + + # now submit the stub_client announcement + furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum" + ann = (furl1, "stub_client", "RIStubClient", + u"nick-v1".encode("utf-8"), "my_version", "oldest") + introducer.remote_publish(ann) + # the server should correlate the two + s = introducer.get_subscribers() + self.failUnlessEqual(len(s), 1) + sn, when, si, rref = s[0] + self.failUnlessIdentical(rref.original, subscriber) + self.failUnlessEqual(sn, "storage") + + self.failUnlessEqual(si["version"], 0) + self.failUnlessEqual(si["oldest-supported"], "oldest") + # v1 announcements do not contain app-versions + self.failUnlessEqual(si["app-versions"], {}) + self.failUnlessEqual(si["nickname"], u"nick-v1") + self.failUnlessEqual(si["my-version"], "my_version") + + # a subscription that arrives after the stub_client announcement + # should be correlated too + subscriber2 = FakeRemoteReference() + introducer.remote_subscribe(subscriber2, "thing2") + + s = introducer.get_subscribers() + subs = dict([(sn, (si,rref)) for sn, when, si, rref in s]) + self.failUnlessEqual(len(subs), 2) + (si,rref) = subs["thing2"] + self.failUnlessIdentical(rref.original, subscriber2) + self.failUnlessEqual(si["version"], 0) + self.failUnlessEqual(si["oldest-supported"], "oldest") + # v1 announcements do not contain app-versions + self.failUnlessEqual(si["app-versions"], {}) + self.failUnlessEqual(si["nickname"], u"nick-v1") + self.failUnlessEqual(si["my-version"], "my_version") + +class Announcements(unittest.TestCase): + def test_client_v2_unsigned(self): + introducer = IntroducerService() + tub = introducer_furl = None + app_versions = {"whizzy": "fizzy"} + client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2", + "my_version", "oldest", app_versions) + furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum" + tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y" + ann_s0 = make_ann_t(client_v2, furl1, None) + canary0 = Referenceable() + introducer.remote_publish_v2(ann_s0, canary0) + a = introducer.get_announcements() + self.failUnlessEqual(len(a), 1) + (index, (ann_s, canary, ann, when)) = a.items()[0] + self.failUnlessIdentical(canary, canary0) + self.failUnlessEqual(index, ("storage", None, tubid)) + self.failUnlessEqual(ann["app-versions"], app_versions) + self.failUnlessEqual(ann["nickname"], u"nick-v2") + self.failUnlessEqual(ann["service-name"], "storage") + self.failUnlessEqual(ann["my-version"], "my_version") + self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1) + + def test_client_v2_signed(self): + introducer = IntroducerService() + tub = introducer_furl = None + app_versions = {"whizzy": "fizzy"} + client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2", + "my_version", "oldest", app_versions) + furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum" + sk_s, vk_s = keyutil.make_keypair() + sk, _ignored = keyutil.parse_privkey(sk_s) + pks = keyutil.remove_prefix(vk_s, "pub-") + ann_t0 = make_ann_t(client_v2, furl1, sk) + canary0 = Referenceable() + introducer.remote_publish_v2(ann_t0, canary0) + a = introducer.get_announcements() + self.failUnlessEqual(len(a), 1) + (index, (ann_s, canary, ann, when)) = a.items()[0] + self.failUnlessIdentical(canary, canary0) + self.failUnlessEqual(index, ("storage", pks, None)) + self.failUnlessEqual(ann["app-versions"], app_versions) + self.failUnlessEqual(ann["nickname"], u"nick-v2") + self.failUnlessEqual(ann["service-name"], "storage") + self.failUnlessEqual(ann["my-version"], "my_version") + self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1) + + def test_client_v1(self): + introducer = IntroducerService() + + furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum" + tubid = "62ubehyunnyhzs7r6vdonnm2hpi52w6y" + ann = (furl1, "storage", "RIStorage", + u"nick-v1".encode("utf-8"), "my_version", "oldest") + introducer.remote_publish(ann) + + a = introducer.get_announcements() + self.failUnlessEqual(len(a), 1) + (index, (ann_s, canary, ann, when)) = a.items()[0] + self.failUnlessEqual(canary, None) + self.failUnlessEqual(index, ("storage", None, tubid)) + self.failUnlessEqual(ann["app-versions"], {}) + self.failUnlessEqual(ann["nickname"], u"nick-v1".encode("utf-8")) + self.failUnlessEqual(ann["service-name"], "storage") + self.failUnlessEqual(ann["my-version"], "my_version") + self.failUnlessEqual(ann["anonymous-storage-FURL"], furl1) + + class TooNewServer(IntroducerService): VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999": { }, @@ -359,10 +868,10 @@ class NonV1Server(SystemTestMixin, unittest.TestCase): tub.setLocation("localhost:%d" % portnum) c = IntroducerClient(tub, self.introducer_furl, - u"nickname-client", "version", "oldest") + u"nickname-client", "version", "oldest", {}) announcements = {} - def got(serverid, ann_d): - announcements[serverid] = ann_d + def got(key_s, ann): + announcements[key_s] = ann c.subscribe_to("storage", got) c.setServiceParent(self.parent) @@ -374,7 +883,8 @@ class NonV1Server(SystemTestMixin, unittest.TestCase): d = self.poll(_got_bad) def _done(res): self.failUnless(c._introducer_error) - self.failUnless(c._introducer_error.check(InsufficientVersionError)) + self.failUnless(c._introducer_error.check(InsufficientVersionError), + c._introducer_error) d.addCallback(_done) return d @@ -388,3 +898,44 @@ class DecodeFurl(unittest.TestCase): nodeid = b32decode(m.group(1).upper()) self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d") +class Signatures(unittest.TestCase): + def test_sign(self): + ann = {"key1": "value1"} + sk_s,vk_s = keyutil.make_keypair() + sk,ignored = keyutil.parse_privkey(sk_s) + ann_t = sign_to_foolscap(ann, sk) + (msg, sig, key) = ann_t + self.failUnlessEqual(type(msg), type("".encode("utf-8"))) # bytes + self.failUnlessEqual(simplejson.loads(msg.decode("utf-8")), ann) + self.failUnless(sig.startswith("v0-")) + self.failUnless(key.startswith("v0-")) + (ann2,key2) = unsign_from_foolscap(ann_t) + self.failUnlessEqual(ann2, ann) + self.failUnlessEqual("pub-"+key2, vk_s) + + # bad signature + bad_ann = {"key1": "value2"} + bad_msg = simplejson.dumps(bad_ann).encode("utf-8") + self.failUnlessRaises(keyutil.BadSignatureError, + unsign_from_foolscap, (bad_msg,sig,key)) + # sneaky bad signature should be ignored + (ann2,key2) = unsign_from_foolscap( (bad_msg,None,key) ) + self.failUnlessEqual(key2, None) + self.failUnlessEqual(ann2, bad_ann) + + # unrecognized signatures + self.failUnlessRaises(UnknownKeyError, + unsign_from_foolscap, (bad_msg,"v999-sig",key)) + self.failUnlessRaises(UnknownKeyError, + unsign_from_foolscap, (bad_msg,sig,"v999-key")) + + +# add tests of StorageFarmBroker: if it receives duplicate announcements, it +# should leave the Reconnector in place, also if it receives +# same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it +# should tear down the Reconnector and make a new one. This behavior used to +# live in the IntroducerClient, and thus used to be tested by test_introducer + +# copying more tests from old branch: + +# then also add Upgrade test diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index 007aa881..c8781178 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -228,7 +228,9 @@ def make_storagebroker(s=None, num_peers=10): storage_broker = StorageFarmBroker(None, True) for peerid in peerids: fss = FakeStorageServer(peerid, s) - storage_broker.test_add_rref(peerid, fss) + ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid), + "permutation-seed-base32": base32.b2a(peerid) } + storage_broker.test_add_rref(peerid, fss, ann) return storage_broker def make_nodemaker(s=None, num_peers=10): diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 3cbaa7c9..23bf1aa2 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -783,7 +783,7 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): newappverstr = "%s: %s" % (allmydata.__appname__, altverstr) self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res)) - self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res) + self.failUnless("Announcement Summary: storage: 5" in res) self.failUnless("Subscription Summary: storage: 5" in res) self.failUnless("tahoe.css" in res) except unittest.FailTest: @@ -804,9 +804,9 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): self.failUnlessEqual(data["subscription_summary"], {"storage": 5}) self.failUnlessEqual(data["announcement_summary"], - {"storage": 5, "stub_client": 5}) + {"storage": 5}) self.failUnlessEqual(data["announcement_distinct_hosts"], - {"storage": 1, "stub_client": 1}) + {"storage": 1}) except unittest.FailTest: print print "GET %s?t=json output was:" % self.introweb_url diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index 224b8bd5..cf050b78 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -11,7 +11,7 @@ import allmydata # for __full_version__ from allmydata import uri, monitor, client from allmydata.immutable import upload, encode from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError -from allmydata.util import log +from allmydata.util import log, base32 from allmydata.util.assertutil import precondition from allmydata.util.deferredutil import DeferredListShouldSucceed from allmydata.test.no_network import GridTestMixin @@ -197,7 +197,9 @@ class FakeClient: for fakeid in range(self.num_servers) ] self.storage_broker = StorageFarmBroker(None, permute_peers=True) for (serverid, rref) in servers: - self.storage_broker.test_add_rref(serverid, rref) + ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(serverid), + "permutation-seed-base32": base32.b2a(serverid) } + self.storage_broker.test_add_rref(serverid, rref, ann) self.last_servers = [s[1] for s in servers] def log(self, *args, **kwargs): diff --git a/src/allmydata/web/introweb.py b/src/allmydata/web/introweb.py index 0e25e20a..783b2bee 100644 --- a/src/allmydata/web/introweb.py +++ b/src/allmydata/web/introweb.py @@ -34,15 +34,20 @@ class IntroducerRoot(rend.Page): def render_JSON(self, ctx): res = {} - clients = self.introducer_service.get_subscribers() - subscription_summary = dict([ (name, len(clients[name])) - for name in clients ]) - res["subscription_summary"] = subscription_summary + + counts = {} + subscribers = self.introducer_service.get_subscribers() + for (service_name, ign, ign, ign) in subscribers: + if service_name not in counts: + counts[service_name] = 0 + counts[service_name] += 1 + res["subscription_summary"] = counts announcement_summary = {} service_hosts = {} - for (ann,when) in self.introducer_service.get_announcements().values(): - (furl, service_name, ri_name, nickname, ver, oldest) = ann + for a in self.introducer_service.get_announcements().values(): + (_, _, ann, when) = a + service_name = ann["service-name"] if service_name not in announcement_summary: announcement_summary[service_name] = 0 announcement_summary[service_name] += 1 @@ -55,6 +60,7 @@ class IntroducerRoot(rend.Page): # enough: when multiple services are run on a single host, # they're usually either configured with the same addresses, # or setLocationAutomatically picks up the same interfaces. + furl = ann["anonymous-storage-FURL"] locations = SturdyRef(furl).getTubRef().getLocations() # list of tuples, ("ipv4", host, port) host = frozenset([hint[1] @@ -79,8 +85,9 @@ class IntroducerRoot(rend.Page): def render_announcement_summary(self, ctx, data): services = {} - for (ann,when) in self.introducer_service.get_announcements().values(): - (furl, service_name, ri_name, nickname, ver, oldest) = ann + for a in self.introducer_service.get_announcements().values(): + (_, _, ann, when) = a + service_name = ann["service-name"] if service_name not in services: services[service_name] = 0 services[service_name] += 1 @@ -90,65 +97,52 @@ class IntroducerRoot(rend.Page): for service_name in service_names]) def render_client_summary(self, ctx, data): + counts = {} clients = self.introducer_service.get_subscribers() - service_names = clients.keys() - service_names.sort() - return ", ".join(["%s: %d" % (service_name, len(clients[service_name])) - for service_name in service_names]) + for (service_name, ign, ign, ign) in clients: + if service_name not in counts: + counts[service_name] = 0 + counts[service_name] += 1 + return ", ".join([ "%s: %d" % (name, counts[name]) + for name in sorted(counts.keys()) ] ) def data_services(self, ctx, data): introsvc = self.introducer_service - ann = [(since,a) - for (a,since) in introsvc.get_announcements().values() - if a[1] != "stub_client"] - ann.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) ) - return ann - - def render_service_row(self, ctx, (since,announcement)): - (furl, service_name, ri_name, nickname, ver, oldest) = announcement - sr = SturdyRef(furl) + services = [] + for a in introsvc.get_announcements().values(): + (_, _, ann, when) = a + if ann["service-name"] == "stub_client": + continue + services.append( (when, ann) ) + services.sort(key=lambda x: (x[1]["service-name"], x[1]["nickname"])) + # this used to be: + #services.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) ) + # service_name was the primary key, then the whole tuple (starting + # with the furl) was the secondary key + return services + + def render_service_row(self, ctx, (since,ann)): + sr = SturdyRef(ann["anonymous-storage-FURL"]) nodeid = sr.tubID advertised = self.show_location_hints(sr) ctx.fillSlots("peerid", nodeid) - ctx.fillSlots("nickname", nickname) + ctx.fillSlots("nickname", ann["nickname"]) ctx.fillSlots("advertised", " ".join(advertised)) ctx.fillSlots("connected", "?") TIME_FORMAT = "%H:%M:%S %d-%b-%Y" ctx.fillSlots("announced", time.strftime(TIME_FORMAT, time.localtime(since))) - ctx.fillSlots("version", ver) - ctx.fillSlots("service_name", service_name) + ctx.fillSlots("version", ann["my-version"]) + ctx.fillSlots("service_name", ann["service-name"]) return ctx.tag def data_subscribers(self, ctx, data): - # use the "stub_client" announcements to get information per nodeid - clients = {} - for (ann,when) in self.introducer_service.get_announcements().values(): - if ann[1] != "stub_client": - continue - (furl, service_name, ri_name, nickname, ver, oldest) = ann - sr = SturdyRef(furl) - nodeid = sr.tubID - clients[nodeid] = ann - - # then we actually provide information per subscriber - s = [] - introsvc = self.introducer_service - for service_name, subscribers in introsvc.get_subscribers().items(): - for (rref, timestamp) in subscribers.items(): - sr = rref.getSturdyRef() - nodeid = sr.tubID - ann = clients.get(nodeid) - s.append( (service_name, rref, timestamp, ann) ) - s.sort() - return s + return self.introducer_service.get_subscribers() def render_subscriber_row(self, ctx, s): - (service_name, rref, since, ann) = s - nickname = "?" - version = "?" - if ann: - (furl, service_name_2, ri_name, nickname, version, oldest) = ann + (service_name, since, info, rref) = s + nickname = info.get("nickname", "?") + version = info.get("my-version", "?") sr = rref.getSturdyRef() # if the subscriber didn't do Tub.setLocation, nodeid will be None