from base64 import b32decode
from zope.interface import implements
from twisted.application import service
-from twisted.internet import defer
from foolscap import Referenceable
from allmydata import node
from allmydata.interfaces import RIIntroducerPublisherAndSubscriberService, \
RIIntroducerSubscriberClient, IIntroducerClient
-from allmydata.util import observer, log, idlib
+from allmydata.util import log, idlib
class IntroducerNode(node.Node):
PORTNUMFILE = "introducer.port"
self._encoding_parameters)
-class PeerCountObserver:
- # This is used by unit test code to wait until peer connections have been
- # established.
-
- def __init__(self):
- # The N'th element of _observers_of_enough_peers is None if nobody has
- # asked to be informed when N peers become connected, it is a
- # OneShotObserverList if someone has asked to be informed, and that list
- # is fired when N peers next become connected (or immediately if N peers
- # are already connected when they asked), and the N'th element is
- # replaced by None when the number of connected peers falls below N.
- # _observers_of_enough_peers is always just long enough to hold the
- # highest-numbered N that anyone is interested in (i.e., there are never
- # trailing Nones in _observers_of_enough_peers).
- self._observers_of_enough_peers = []
- # The N'th element of _observers_of_fewer_than_peers is None if nobody
- # has asked to be informed when we become connected to fewer than N
- # peers, it is a OneShotObserverList if someone has asked to be
- # informed, and that list is fired when we become connected to fewer
- # than N peers (or immediately if we are already connected to fewer than
- # N peers when they asked). _observers_of_fewer_than_peers is always
- # just long enough to hold the highest-numbered N that anyone is
- # interested in (i.e., there are never trailing Nones in
- # _observers_of_fewer_than_peers).
- self._observers_of_fewer_than_peers = []
- self.connection_observers = observer.ObserverList()
-
- def _notify_observers_of_enough_peers(self, numpeers):
- if len(self._observers_of_enough_peers) > numpeers:
- osol = self._observers_of_enough_peers[numpeers]
- if osol:
- osol.fire(None)
-
- def _remove_observers_of_enough_peers(self, numpeers):
- if len(self._observers_of_enough_peers) > numpeers:
- self._observers_of_enough_peers[numpeers] = None
- while self._observers_of_enough_peers and (not self._observers_of_enough_peers[-1]):
- self._observers_of_enough_peers.pop()
-
- def _notify_observers_of_fewer_than_peers(self, numpeers):
- if len(self._observers_of_fewer_than_peers) > numpeers:
- osol = self._observers_of_fewer_than_peers[numpeers]
- if osol:
- osol.fire(None)
- self._observers_of_fewer_than_peers[numpeers] = None
- while len(self._observers_of_fewer_than_peers) > numpeers and (not self._observers_of_fewer_than_peers[-1]):
- self._observers_of_fewer_than_peers.pop()
-
- def when_enough_peers(self, numpeers):
- """
- I return a deferred that fires the next time that at least
- numpeers are connected, or fires immediately if numpeers are
- currently connected.
- """
- self._observers_of_enough_peers.extend([None]*(numpeers+1-len(self._observers_of_enough_peers)))
- if not self._observers_of_enough_peers[numpeers]:
- self._observers_of_enough_peers[numpeers] = observer.OneShotObserverList()
- if len(self.connections) >= numpeers:
- self._observers_of_enough_peers[numpeers].fire(self)
- return self._observers_of_enough_peers[numpeers].when_fired()
-
- def when_fewer_than_peers(self, numpeers):
- """
- I return a deferred that fires the next time that fewer than numpeers
- are connected, or fires immediately if fewer than numpeers are currently
- connected.
- """
- if len(self.connections) < numpeers:
- return defer.succeed(None)
- else:
- self._observers_of_fewer_than_peers.extend([None]*(numpeers+1-len(self._observers_of_fewer_than_peers)))
- if not self._observers_of_fewer_than_peers[numpeers]:
- self._observers_of_fewer_than_peers[numpeers] = observer.OneShotObserverList()
- return self._observers_of_fewer_than_peers[numpeers].when_fired()
-
- def notify_on_new_connection(self, cb):
- """Register a callback that will be fired (with nodeid, rref) when
- a new connection is established."""
- self.connection_observers.subscribe(cb)
-
- def add_peer(self, ann):
- self._notify_observers_of_enough_peers(len(self.connections))
- self._notify_observers_of_fewer_than_peers(len(self.connections))
-
- def remove_peer(self, ann):
- self._remove_observers_of_enough_peers(len(self.connections))
- self._notify_observers_of_fewer_than_peers(len(self.connections)+1)
-
-
class RemoteServiceConnector:
"""I hold information about a peer service that we want to connect to. If
# self._connections is a set of (peerid, service_name, rref) tuples
self._connections = set()
- #self.counter = PeerCountObserver()
self.counter = 0 # incremented each time we change state, for tests
self.encoding_parameters = None