From: Brian Warner Date: Wed, 24 Jun 2009 19:40:38 +0000 (-0700) Subject: remove introducer/old.py, will create something similar when the RIIntroducer changes X-Git-Tag: trac-4000~66 X-Git-Url: https://git.rkrishnan.org/specifications/components/com_hotproperty/reliability?a=commitdiff_plain;h=837733641f9da4e42308e45376fa148e54d3b80c;p=tahoe-lafs%2Ftahoe-lafs.git remove introducer/old.py, will create something similar when the RIIntroducer changes --- diff --git a/src/allmydata/introducer/old.py b/src/allmydata/introducer/old.py deleted file mode 100644 index 831ddc6c..00000000 --- a/src/allmydata/introducer/old.py +++ /dev/null @@ -1,356 +0,0 @@ - -# We keep a copy of the old introducer (both client and server) here to -# support compatibility tests. The old client is supposed to handle the new -# server, and new client is supposed to handle the old server. - -import re, time, sha -from base64 import b32decode -from zope.interface import implements -from twisted.application import service -from foolscap.api import Referenceable -from allmydata.util import log, idlib -from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \ - IIntroducerClient, RIIntroducerPublisherAndSubscriberService - -def make_index(announcement): - (furl, service_name, ri_name, nickname, ver, oldest) = announcement - m = re.match(r'pb://(\w+)@', furl) - assert m - nodeid = b32decode(m.group(1).upper()) - return (nodeid, service_name) - -class RemoteServiceConnector: - """I hold information about a peer service that we want to connect to. If - we are connected, I hold the RemoteReference, the peer's address, and the - peer's version information. I remember information about when we were - last connected to the peer too, even if we aren't currently connected. - - @ivar announcement_time: when we first heard about this service - @ivar last_connect_time: when we last established a connection - @ivar last_loss_time: when we last lost a connection - - @ivar version: the peer's version, from the most recent announcement - @ivar oldest_supported: the peer's oldest supported version, same - @ivar nickname: the peer's self-reported nickname, same - - @ivar rref: the RemoteReference, if connected, otherwise None - @ivar remote_host: the IAddress, if connected, otherwise None - """ - - def __init__(self, announcement, tub, ic): - self._tub = tub - self._announcement = announcement - self._ic = ic - (furl, service_name, ri_name, nickname, ver, oldest) = announcement - - self._furl = furl - m = re.match(r'pb://(\w+)@', furl) - assert m - self._nodeid = b32decode(m.group(1).upper()) - self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid) - - self.service_name = service_name - - self.log("attempting to connect to %s" % self._nodeid_s) - self.announcement_time = time.time() - self.last_loss_time = None - self.rref = None - self.remote_host = None - self.last_connect_time = None - self.version = ver - self.oldest_supported = oldest - self.nickname = nickname - - def log(self, *args, **kwargs): - return self._ic.log(*args, **kwargs) - - def startConnecting(self): - self._reconnector = self._tub.connectTo(self._furl, self._got_service) - - def stopConnecting(self): - self._reconnector.stopConnecting() - - def _got_service(self, rref): - self.last_connect_time = time.time() - self.remote_host = rref.tracker.broker.transport.getPeer() - - self.rref = rref - self.log("connected to %s" % self._nodeid_s) - - self._ic.add_connection(self._nodeid, self.service_name, rref) - - rref.notifyOnDisconnect(self._lost, rref) - - def _lost(self, rref): - self.log("lost connection to %s" % self._nodeid_s) - self.last_loss_time = time.time() - self.rref = None - self.remote_host = None - self._ic.remove_connection(self._nodeid, self.service_name, rref) - - def reset(self): - self._reconnector.reset() - - -class IntroducerClient_V1(service.Service, Referenceable): - implements(RIIntroducerSubscriberClient, IIntroducerClient) - - def __init__(self, tub, introducer_furl, - nickname, my_version, oldest_supported): - self._tub = tub - self.introducer_furl = introducer_furl - - self._nickname = nickname - self._my_version = my_version - self._oldest_supported = oldest_supported - - self._published_announcements = set() - - self._publisher = None - self._connected = False - - self._subscribed_service_names = set() - self._subscriptions = set() # requests we've actually sent - self._received_announcements = set() - # TODO: this set will grow without bound, until the node is restarted - - # we only accept one announcement per (peerid+service_name) pair. - # This insures that an upgraded host replace their previous - # announcement. It also means that each peer must have their own Tub - # (no sharing), which is slightly weird but consistent with the rest - # of the Tahoe codebase. - self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector - # self._connections is a set of (peerid, service_name, rref) tuples - self._connections = set() - - self.counter = 0 # incremented each time we change state, for tests - self.encoding_parameters = None - - def startService(self): - service.Service.startService(self) - rc = self._tub.connectTo(self.introducer_furl, self._got_introducer) - self._introducer_reconnector = rc - def connect_failed(failure): - self.log("Initial Introducer connection failed: perhaps it's down", - level=log.WEIRD, failure=failure) - d = self._tub.getReference(self.introducer_furl) - d.addErrback(connect_failed) - - def _got_introducer(self, publisher): - self.log("connected to introducer") - self._connected = True - self._publisher = publisher - publisher.notifyOnDisconnect(self._disconnected) - self._maybe_publish() - self._maybe_subscribe() - - def _disconnected(self): - self.log("bummer, we've lost our connection to the introducer") - self._connected = False - self._publisher = None - self._subscriptions.clear() - - def stopService(self): - service.Service.stopService(self) - self._introducer_reconnector.stopConnecting() - for rsc in self._connectors.itervalues(): - rsc.stopConnecting() - - def log(self, *args, **kwargs): - if "facility" not in kwargs: - kwargs["facility"] = "tahoe.introducer" - return log.msg(*args, **kwargs) - - - def publish(self, furl, service_name, remoteinterface_name): - ann = (furl, service_name, remoteinterface_name, - self._nickname, self._my_version, self._oldest_supported) - self._published_announcements.add(ann) - self._maybe_publish() - - def subscribe_to(self, service_name): - self._subscribed_service_names.add(service_name) - self._maybe_subscribe() - - def _maybe_subscribe(self): - if not self._publisher: - self.log("want to subscribe, but no introducer yet", - level=log.NOISY) - return - for service_name in self._subscribed_service_names: - if service_name not in self._subscriptions: - # there is a race here, but the subscription desk ignores - # duplicate requests. - self._subscriptions.add(service_name) - d = self._publisher.callRemote("subscribe", self, service_name) - d.addErrback(log.err, facility="tahoe.introducer", - level=log.WEIRD) - - def _maybe_publish(self): - if not self._publisher: - self.log("want to publish, but no introducer yet", level=log.NOISY) - return - # this re-publishes everything. The Introducer ignores duplicates - for ann in self._published_announcements: - d = self._publisher.callRemote("publish", ann) - d.addErrback(log.err, facility="tahoe.introducer", - level=log.WEIRD) - - - - def remote_announce(self, announcements): - for ann in announcements: - self.log("received %d announcements" % len(announcements)) - (furl, service_name, ri_name, nickname, ver, oldest) = ann - if service_name not in self._subscribed_service_names: - self.log("announcement for a service we don't care about [%s]" - % (service_name,), level=log.WEIRD) - continue - if ann in self._received_announcements: - self.log("ignoring old announcement: %s" % (ann,), - level=log.NOISY) - continue - self.log("new announcement[%s]: %s" % (service_name, ann)) - self._received_announcements.add(ann) - self._new_announcement(ann) - - def _new_announcement(self, announcement): - # this will only be called for new announcements - index = make_index(announcement) - if index in self._connectors: - self.log("replacing earlier announcement", level=log.NOISY) - self._connectors[index].stopConnecting() - rsc = RemoteServiceConnector(announcement, self._tub, self) - self._connectors[index] = rsc - rsc.startConnecting() - - def add_connection(self, nodeid, service_name, rref): - self._connections.add( (nodeid, service_name, rref) ) - self.counter += 1 - # when one connection is established, reset the timers on all others, - # to trigger a reconnection attempt in one second. This is intended - # to accelerate server connections when we've been offline for a - # while. The goal is to avoid hanging out for a long time with - # connections to only a subset of the servers, which would increase - # the chances that we'll put shares in weird places (and not update - # existing shares of mutable files). See #374 for more details. - for rsc in self._connectors.values(): - rsc.reset() - - def remove_connection(self, nodeid, service_name, rref): - self._connections.discard( (nodeid, service_name, rref) ) - self.counter += 1 - - - def get_all_connections(self): - return frozenset(self._connections) - - def get_all_connectors(self): - return self._connectors.copy() - - def get_all_peerids(self): - return frozenset([peerid - for (peerid, service_name, rref) - in self._connections]) - - def get_nickname_for_peerid(self, peerid): - for k in self._connectors: - (peerid0, svcname0) = k - if peerid0 == peerid: - rsc = self._connectors[k] - return rsc.nickname - return None - - def get_all_connections_for(self, service_name): - return frozenset([c - for c in self._connections - if c[1] == service_name]) - - def get_permuted_peers(self, service_name, key): - """Return an ordered list of (peerid, rref) tuples.""" - - results = [] - for (c_peerid, c_service_name, rref) in self._connections: - assert isinstance(c_peerid, str) - if c_service_name != service_name: - continue - permuted = sha.new(key + c_peerid).digest() - results.append((permuted, c_peerid, rref)) - - results.sort(lambda a,b: cmp(a[0], b[0])) - return [ (r[1], r[2]) for r in results ] - - - - def remote_set_encoding_parameters(self, parameters): - self.encoding_parameters = parameters - - def connected_to_introducer(self): - return self._connected - - def debug_disconnect_from_peerid(self, victim_nodeid): - # for unit tests: locate and sever all connections to the given - # peerid. - for (nodeid, service_name, rref) in self._connections: - if nodeid == victim_nodeid: - rref.tracker.broker.transport.loseConnection() - - -class IntroducerService_V1(service.MultiService, Referenceable): - implements(RIIntroducerPublisherAndSubscriberService) - name = "introducer" - - def __init__(self, basedir="."): - service.MultiService.__init__(self) - self.introducer_url = None - # 'index' is (tubid, service_name) - self._announcements = {} # dict of index -> (announcement, timestamp) - self._subscribers = {} # dict of (rref->timestamp) dicts - - def log(self, *args, **kwargs): - if "facility" not in kwargs: - kwargs["facility"] = "tahoe.introducer" - return log.msg(*args, **kwargs) - - def get_announcements(self): - return self._announcements - def get_subscribers(self): - return self._subscribers - - def remote_publish(self, announcement): - self.log("introducer: announcement published: %s" % (announcement,) ) - index = make_index(announcement) - if index in self._announcements: - (old_announcement, timestamp) = self._announcements[index] - if old_announcement == announcement: - self.log("but we already knew it, ignoring", level=log.NOISY) - return - else: - self.log("old announcement being updated", level=log.NOISY) - self._announcements[index] = (announcement, time.time()) - (furl, service_name, ri_name, nickname, ver, oldest) = announcement - for s in self._subscribers.get(service_name, []): - s.callRemote("announce", set([announcement])) - - def remote_subscribe(self, subscriber, service_name): - self.log("introducer: subscription[%s] request at %s" % (service_name, - subscriber)) - if service_name not in self._subscribers: - self._subscribers[service_name] = {} - subscribers = self._subscribers[service_name] - if subscriber in subscribers: - self.log("but they're already subscribed, ignoring", - level=log.UNUSUAL) - return - subscribers[subscriber] = time.time() - def _remove(): - self.log("introducer: unsubscribing[%s] %s" % (service_name, - subscriber)) - subscribers.pop(subscriber, None) - subscriber.notifyOnDisconnect(_remove) - - announcements = set( [ ann - for idx,(ann,when) in self._announcements.items() - if idx[1] == service_name] ) - d = subscriber.callRemote("announce", announcements) - d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL) diff --git a/src/allmydata/test/test_introducer.py b/src/allmydata/test/test_introducer.py index d5c59e09..7958158d 100644 --- a/src/allmydata/test/test_introducer.py +++ b/src/allmydata/test/test_introducer.py @@ -1,6 +1,6 @@ -from base64 import b32decode -import os +import os, re +from base64 import b32decode from twisted.trial import unittest from twisted.internet import defer @@ -13,7 +13,6 @@ from allmydata.introducer.client import IntroducerClient from allmydata.introducer.server import IntroducerService # test compatibility with old introducer .tac files from allmydata.introducer import IntroducerNode -from allmydata.introducer import old from allmydata.util import pollmixin import common_util as testutil @@ -206,7 +205,6 @@ class SystemTest(SystemTestMixin, unittest.TestCase): def _restart_introducer_tub(_ign): log.msg("restarting introducer's Tub") - # note: old.Server doesn't have this count dc = introducer._debug_counts self.expected_count = dc["inbound_message"] + NUM_SERVERS self.expected_subscribe_count = dc["inbound_subscribe"] + NUMCLIENTS+1 @@ -379,14 +377,13 @@ class NonV1Server(SystemTestMixin, unittest.TestCase): d.addCallback(_done) return d -class Index(unittest.TestCase): - def test_make_index(self): +class DecodeFurl(unittest.TestCase): + def test_decode(self): # make sure we have a working base64.b32decode. The one in # python2.4.[01] was broken. - ann = ('pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i', - 'storage', 'RIStorageServer.tahoe.allmydata.com', - 'plancha', 'allmydata-tahoe/1.4.1', '1.0.0') - (nodeid, service_name) = old.make_index(ann) + furl = 'pb://t5g7egomnnktbpydbuijt6zgtmw4oqi5@127.0.0.1:51857/hfzv36i' + m = re.match(r'pb://(\w+)@', furl) + assert m + nodeid = b32decode(m.group(1).upper()) self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d") - self.failUnlessEqual(service_name, "storage")