2 # We keep a copy of the old introducer (both client and server) here to
3 # support compatibility tests. The old client is supposed to handle the new
4 # server, and new client is supposed to handle the old server.
7 from base64 import b32decode
8 from zope.interface import implements
9 from twisted.application import service
10 from foolscap import Referenceable
11 from allmydata.util import log, idlib
12 from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
13 IIntroducerClient, RIIntroducerPublisherAndSubscriberService
14 from allmydata.introducer.common import make_index
16 class RemoteServiceConnector:
17 """I hold information about a peer service that we want to connect to. If
18 we are connected, I hold the RemoteReference, the peer's address, and the
19 peer's version information. I remember information about when we were
20 last connected to the peer too, even if we aren't currently connected.
22 @ivar announcement_time: when we first heard about this service
23 @ivar last_connect_time: when we last established a connection
24 @ivar last_loss_time: when we last lost a connection
26 @ivar version: the peer's version, from the most recent announcement
27 @ivar oldest_supported: the peer's oldest supported version, same
28 @ivar nickname: the peer's self-reported nickname, same
30 @ivar rref: the RemoteReference, if connected, otherwise None
31 @ivar remote_host: the IAddress, if connected, otherwise None
34 def __init__(self, announcement, tub, ic):
36 self._announcement = announcement
38 (furl, service_name, ri_name, nickname, ver, oldest) = announcement
41 m = re.match(r'pb://(\w+)@', furl)
43 self._nodeid = b32decode(m.group(1).upper())
44 self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
46 self.service_name = service_name
48 self.log("attempting to connect to %s" % self._nodeid_s)
49 self.announcement_time = time.time()
50 self.last_loss_time = None
52 self.remote_host = None
53 self.last_connect_time = None
55 self.oldest_supported = oldest
56 self.nickname = nickname
58 def log(self, *args, **kwargs):
59 return self._ic.log(*args, **kwargs)
61 def startConnecting(self):
62 self._reconnector = self._tub.connectTo(self._furl, self._got_service)
64 def stopConnecting(self):
65 self._reconnector.stopConnecting()
67 def _got_service(self, rref):
68 self.last_connect_time = time.time()
69 self.remote_host = rref.tracker.broker.transport.getPeer()
72 self.log("connected to %s" % self._nodeid_s)
74 self._ic.add_connection(self._nodeid, self.service_name, rref)
76 rref.notifyOnDisconnect(self._lost, rref)
78 def _lost(self, rref):
79 self.log("lost connection to %s" % self._nodeid_s)
80 self.last_loss_time = time.time()
82 self.remote_host = None
83 self._ic.remove_connection(self._nodeid, self.service_name, rref)
86 self._reconnector.reset()
89 class IntroducerClient_V1(service.Service, Referenceable):
90 implements(RIIntroducerSubscriberClient, IIntroducerClient)
92 def __init__(self, tub, introducer_furl,
93 nickname, my_version, oldest_supported):
95 self.introducer_furl = introducer_furl
97 self._nickname = nickname
98 self._my_version = my_version
99 self._oldest_supported = oldest_supported
101 self._published_announcements = set()
103 self._publisher = None
104 self._connected = False
106 self._subscribed_service_names = set()
107 self._subscriptions = set() # requests we've actually sent
108 self._received_announcements = set()
109 # TODO: this set will grow without bound, until the node is restarted
111 # we only accept one announcement per (peerid+service_name) pair.
112 # This insures that an upgraded host replace their previous
113 # announcement. It also means that each peer must have their own Tub
114 # (no sharing), which is slightly weird but consistent with the rest
115 # of the Tahoe codebase.
116 self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
117 # self._connections is a set of (peerid, service_name, rref) tuples
118 self._connections = set()
120 self.counter = 0 # incremented each time we change state, for tests
121 self.encoding_parameters = None
123 def startService(self):
124 service.Service.startService(self)
125 rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
126 self._introducer_reconnector = rc
127 def connect_failed(failure):
128 self.log("Initial Introducer connection failed: perhaps it's down",
129 level=log.WEIRD, failure=failure)
130 d = self._tub.getReference(self.introducer_furl)
131 d.addErrback(connect_failed)
133 def _got_introducer(self, publisher):
134 self.log("connected to introducer")
135 self._connected = True
136 self._publisher = publisher
137 publisher.notifyOnDisconnect(self._disconnected)
138 self._maybe_publish()
139 self._maybe_subscribe()
141 def _disconnected(self):
142 self.log("bummer, we've lost our connection to the introducer")
143 self._connected = False
144 self._publisher = None
145 self._subscriptions.clear()
147 def stopService(self):
148 service.Service.stopService(self)
149 self._introducer_reconnector.stopConnecting()
150 for rsc in self._connectors.itervalues():
153 def log(self, *args, **kwargs):
154 if "facility" not in kwargs:
155 kwargs["facility"] = "tahoe.introducer"
156 return log.msg(*args, **kwargs)
159 def publish(self, furl, service_name, remoteinterface_name):
160 ann = (furl, service_name, remoteinterface_name,
161 self._nickname, self._my_version, self._oldest_supported)
162 self._published_announcements.add(ann)
163 self._maybe_publish()
165 def subscribe_to(self, service_name):
166 self._subscribed_service_names.add(service_name)
167 self._maybe_subscribe()
169 def _maybe_subscribe(self):
170 if not self._publisher:
171 self.log("want to subscribe, but no introducer yet",
174 for service_name in self._subscribed_service_names:
175 if service_name not in self._subscriptions:
176 # there is a race here, but the subscription desk ignores
177 # duplicate requests.
178 self._subscriptions.add(service_name)
179 d = self._publisher.callRemote("subscribe", self, service_name)
180 d.addErrback(log.err, facility="tahoe.introducer",
183 def _maybe_publish(self):
184 if not self._publisher:
185 self.log("want to publish, but no introducer yet", level=log.NOISY)
187 # this re-publishes everything. The Introducer ignores duplicates
188 for ann in self._published_announcements:
189 d = self._publisher.callRemote("publish", ann)
190 d.addErrback(log.err, facility="tahoe.introducer",
195 def remote_announce(self, announcements):
196 for ann in announcements:
197 self.log("received %d announcements" % len(announcements))
198 (furl, service_name, ri_name, nickname, ver, oldest) = ann
199 if service_name not in self._subscribed_service_names:
200 self.log("announcement for a service we don't care about [%s]"
201 % (service_name,), level=log.WEIRD)
203 if ann in self._received_announcements:
204 self.log("ignoring old announcement: %s" % (ann,),
207 self.log("new announcement[%s]: %s" % (service_name, ann))
208 self._received_announcements.add(ann)
209 self._new_announcement(ann)
211 def _new_announcement(self, announcement):
212 # this will only be called for new announcements
213 index = make_index(announcement)
214 if index in self._connectors:
215 self.log("replacing earlier announcement", level=log.NOISY)
216 self._connectors[index].stopConnecting()
217 rsc = RemoteServiceConnector(announcement, self._tub, self)
218 self._connectors[index] = rsc
219 rsc.startConnecting()
221 def add_connection(self, nodeid, service_name, rref):
222 self._connections.add( (nodeid, service_name, rref) )
224 # when one connection is established, reset the timers on all others,
225 # to trigger a reconnection attempt in one second. This is intended
226 # to accelerate server connections when we've been offline for a
227 # while. The goal is to avoid hanging out for a long time with
228 # connections to only a subset of the servers, which would increase
229 # the chances that we'll put shares in weird places (and not update
230 # existing shares of mutable files). See #374 for more details.
231 for rsc in self._connectors.values():
234 def remove_connection(self, nodeid, service_name, rref):
235 self._connections.discard( (nodeid, service_name, rref) )
239 def get_all_connections(self):
240 return frozenset(self._connections)
242 def get_all_connectors(self):
243 return self._connectors.copy()
245 def get_all_peerids(self):
246 return frozenset([peerid
247 for (peerid, service_name, rref)
248 in self._connections])
250 def get_all_connections_for(self, service_name):
252 for c in self._connections
253 if c[1] == service_name])
255 def get_permuted_peers(self, service_name, key):
256 """Return an ordered list of (peerid, rref) tuples."""
259 for (c_peerid, c_service_name, rref) in self._connections:
260 assert isinstance(c_peerid, str)
261 if c_service_name != service_name:
263 permuted = sha.new(key + c_peerid).digest()
264 results.append((permuted, c_peerid, rref))
266 results.sort(lambda a,b: cmp(a[0], b[0]))
267 return [ (r[1], r[2]) for r in results ]
271 def remote_set_encoding_parameters(self, parameters):
272 self.encoding_parameters = parameters
274 def connected_to_introducer(self):
275 return self._connected
277 def debug_disconnect_from_peerid(self, victim_nodeid):
278 # for unit tests: locate and sever all connections to the given
280 for (nodeid, service_name, rref) in self._connections:
281 if nodeid == victim_nodeid:
282 rref.tracker.broker.transport.loseConnection()
285 class IntroducerService_V1(service.MultiService, Referenceable):
286 implements(RIIntroducerPublisherAndSubscriberService)
289 def __init__(self, basedir="."):
290 service.MultiService.__init__(self)
291 self.introducer_url = None
292 # 'index' is (tubid, service_name)
293 self._announcements = {} # dict of index -> (announcement, timestamp)
294 self._subscribers = {} # dict of (rref->timestamp) dicts
296 def log(self, *args, **kwargs):
297 if "facility" not in kwargs:
298 kwargs["facility"] = "tahoe.introducer"
299 return log.msg(*args, **kwargs)
301 def get_announcements(self):
302 return self._announcements
303 def get_subscribers(self):
304 return self._subscribers
306 def remote_publish(self, announcement):
307 self.log("introducer: announcement published: %s" % (announcement,) )
308 index = make_index(announcement)
309 if index in self._announcements:
310 (old_announcement, timestamp) = self._announcements[index]
311 if old_announcement == announcement:
312 self.log("but we already knew it, ignoring", level=log.NOISY)
315 self.log("old announcement being updated", level=log.NOISY)
316 self._announcements[index] = (announcement, time.time())
317 (furl, service_name, ri_name, nickname, ver, oldest) = announcement
318 for s in self._subscribers.get(service_name, []):
319 s.callRemote("announce", set([announcement]))
321 def remote_subscribe(self, subscriber, service_name):
322 self.log("introducer: subscription[%s] request at %s" % (service_name,
324 if service_name not in self._subscribers:
325 self._subscribers[service_name] = {}
326 subscribers = self._subscribers[service_name]
327 if subscriber in subscribers:
328 self.log("but they're already subscribed, ignoring",
331 subscribers[subscriber] = time.time()
333 self.log("introducer: unsubscribing[%s] %s" % (service_name,
335 subscribers.pop(subscriber, None)
336 subscriber.notifyOnDisconnect(_remove)
338 announcements = set( [ ann
339 for idx,(ann,when) in self._announcements.items()
340 if idx[1] == service_name] )
341 d = subscriber.callRemote("announce", announcements)
342 d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)