3 from base64 import b32decode
4 from zope.interface import implements
5 from twisted.application import service
6 from foolscap import Referenceable
7 from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
9 from allmydata.util import log, idlib
10 from allmydata.introducer.common import make_index
13 class RemoteServiceConnector:
14 """I hold information about a peer service that we want to connect to. If
15 we are connected, I hold the RemoteReference, the peer's address, and the
16 peer's version information. I remember information about when we were
17 last connected to the peer too, even if we aren't currently connected.
19 @ivar announcement_time: when we first heard about this service
20 @ivar last_connect_time: when we last established a connection
21 @ivar last_loss_time: when we last lost a connection
23 @ivar version: the peer's version, from the most recent announcement
24 @ivar oldest_supported: the peer's oldest supported version, same
25 @ivar nickname: the peer's self-reported nickname, same
27 @ivar rref: the RemoteReference, if connected, otherwise None
28 @ivar remote_host: the IAddress, if connected, otherwise None
31 def __init__(self, announcement, tub, ic):
33 self._announcement = announcement
35 (furl, service_name, ri_name, nickname, ver, oldest) = announcement
38 m = re.match(r'pb://(\w+)@', furl)
40 self._nodeid = b32decode(m.group(1).upper())
41 self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
43 self.service_name = service_name
45 self.log("attempting to connect to %s" % self._nodeid_s)
46 self.announcement_time = time.time()
47 self.last_loss_time = None
49 self.remote_host = None
50 self.last_connect_time = None
52 self.oldest_supported = oldest
53 self.nickname = nickname
55 def log(self, *args, **kwargs):
56 return self._ic.log(*args, **kwargs)
58 def startConnecting(self):
59 self._reconnector = self._tub.connectTo(self._furl, self._got_service)
61 def stopConnecting(self):
62 self._reconnector.stopConnecting()
64 def _got_service(self, rref):
65 self.last_connect_time = time.time()
66 self.remote_host = rref.tracker.broker.transport.getPeer()
69 self.log("connected to %s" % self._nodeid_s)
71 self._ic.add_connection(self._nodeid, self.service_name, rref)
73 rref.notifyOnDisconnect(self._lost, rref)
75 def _lost(self, rref):
76 self.log("lost connection to %s" % self._nodeid_s)
77 self.last_loss_time = time.time()
79 self.remote_host = None
80 self._ic.remove_connection(self._nodeid, self.service_name, rref)
83 self._reconnector.reset()
86 class IntroducerClient(service.Service, Referenceable):
87 implements(RIIntroducerSubscriberClient, IIntroducerClient)
89 def __init__(self, tub, introducer_furl,
90 nickname, my_version, oldest_supported):
92 self.introducer_furl = introducer_furl
94 self._nickname = nickname
95 self._my_version = my_version
96 self._oldest_supported = oldest_supported
98 self._published_announcements = set()
100 self._publisher = None
101 self._connected = False
103 self._subscribed_service_names = set()
104 self._subscriptions = set() # requests we've actually sent
105 self._received_announcements = set()
106 # TODO: this set will grow without bound, until the node is restarted
108 # we only accept one announcement per (peerid+service_name) pair.
109 # This insures that an upgraded host replace their previous
110 # announcement. It also means that each peer must have their own Tub
111 # (no sharing), which is slightly weird but consistent with the rest
112 # of the Tahoe codebase.
113 self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
114 # self._connections is a set of (peerid, service_name, rref) tuples
115 self._connections = set()
117 self.counter = 0 # incremented each time we change state, for tests
118 self.encoding_parameters = None
120 def startService(self):
121 service.Service.startService(self)
122 rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
123 self._introducer_reconnector = rc
124 def connect_failed(failure):
125 self.log("Initial Introducer connection failed: perhaps it's down",
126 level=log.WEIRD, failure=failure)
127 d = self._tub.getReference(self.introducer_furl)
128 d.addErrback(connect_failed)
130 def _got_introducer(self, publisher):
131 self.log("connected to introducer")
132 self._connected = True
133 self._publisher = publisher
134 publisher.notifyOnDisconnect(self._disconnected)
135 self._maybe_publish()
136 self._maybe_subscribe()
138 def _disconnected(self):
139 self.log("bummer, we've lost our connection to the introducer")
140 self._connected = False
141 self._publisher = None
142 self._subscriptions.clear()
144 def stopService(self):
145 service.Service.stopService(self)
146 self._introducer_reconnector.stopConnecting()
147 for rsc in self._connectors.itervalues():
150 def log(self, *args, **kwargs):
151 if "facility" not in kwargs:
152 kwargs["facility"] = "tahoe.introducer"
153 return log.msg(*args, **kwargs)
156 def publish(self, furl, service_name, remoteinterface_name):
157 ann = (furl, service_name, remoteinterface_name,
158 self._nickname, self._my_version, self._oldest_supported)
159 self._published_announcements.add(ann)
160 self._maybe_publish()
162 def subscribe_to(self, service_name):
163 self._subscribed_service_names.add(service_name)
164 self._maybe_subscribe()
166 def _maybe_subscribe(self):
167 if not self._publisher:
168 self.log("want to subscribe, but no introducer yet",
171 for service_name in self._subscribed_service_names:
172 if service_name not in self._subscriptions:
173 # there is a race here, but the subscription desk ignores
174 # duplicate requests.
175 self._subscriptions.add(service_name)
176 d = self._publisher.callRemote("subscribe", self, service_name)
177 d.addErrback(log.err, facility="tahoe.introducer",
180 def _maybe_publish(self):
181 if not self._publisher:
182 self.log("want to publish, but no introducer yet", level=log.NOISY)
184 # this re-publishes everything. The Introducer ignores duplicates
185 for ann in self._published_announcements:
186 d = self._publisher.callRemote("publish", ann)
187 d.addErrback(log.err, facility="tahoe.introducer",
192 def remote_announce(self, announcements):
193 for ann in announcements:
194 self.log("received %d announcements" % len(announcements))
195 (furl, service_name, ri_name, nickname, ver, oldest) = ann
196 if service_name not in self._subscribed_service_names:
197 self.log("announcement for a service we don't care about [%s]"
198 % (service_name,), level=log.WEIRD)
200 if ann in self._received_announcements:
201 self.log("ignoring old announcement: %s" % (ann,),
204 self.log("new announcement[%s]: %s" % (service_name, ann))
205 self._received_announcements.add(ann)
206 self._new_announcement(ann)
208 def _new_announcement(self, announcement):
209 # this will only be called for new announcements
210 index = make_index(announcement)
211 if index in self._connectors:
212 self.log("replacing earlier announcement", level=log.NOISY)
213 self._connectors[index].stopConnecting()
214 rsc = RemoteServiceConnector(announcement, self._tub, self)
215 self._connectors[index] = rsc
216 rsc.startConnecting()
218 def add_connection(self, nodeid, service_name, rref):
219 self._connections.add( (nodeid, service_name, rref) )
221 # when one connection is established, reset the timers on all others,
222 # to trigger a reconnection attempt in one second. This is intended
223 # to accelerate server connections when we've been offline for a
224 # while. The goal is to avoid hanging out for a long time with
225 # connections to only a subset of the servers, which would increase
226 # the chances that we'll put shares in weird places (and not update
227 # existing shares of mutable files). See #374 for more details.
228 for rsc in self._connectors.values():
231 def remove_connection(self, nodeid, service_name, rref):
232 self._connections.discard( (nodeid, service_name, rref) )
236 def get_all_connections(self):
237 return frozenset(self._connections)
239 def get_all_connectors(self):
240 return self._connectors.copy()
242 def get_all_peerids(self):
243 return frozenset([peerid
244 for (peerid, service_name, rref)
245 in self._connections])
247 def get_all_connections_for(self, service_name):
249 for c in self._connections
250 if c[1] == service_name])
252 def get_permuted_peers(self, service_name, key):
253 """Return an ordered list of (peerid, rref) tuples."""
256 for (c_peerid, c_service_name, rref) in self._connections:
257 assert isinstance(c_peerid, str)
258 if c_service_name != service_name:
260 permuted = sha.new(key + c_peerid).digest()
261 results.append((permuted, c_peerid, rref))
263 results.sort(lambda a,b: cmp(a[0], b[0]))
264 return [ (r[1], r[2]) for r in results ]
268 def remote_set_encoding_parameters(self, parameters):
269 self.encoding_parameters = parameters
271 def connected_to_introducer(self):
272 return self._connected
274 def debug_disconnect_from_peerid(self, victim_nodeid):
275 # for unit tests: locate and sever all connections to the given
277 for (nodeid, service_name, rref) in self._connections:
278 if nodeid == victim_nodeid:
279 rref.tracker.broker.transport.loseConnection()