3 from base64 import b32decode
4 from zope.interface import implements
5 from twisted.application import service
6 from foolscap import Referenceable
7 from allmydata.interfaces import InsufficientVersionError
8 from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
10 from allmydata.util import log, idlib
11 from allmydata.util.rrefutil import get_versioned_remote_reference
12 from allmydata.introducer.common import make_index
15 class RemoteServiceConnector:
16 """I hold information about a peer service that we want to connect to. If
17 we are connected, I hold the RemoteReference, the peer's address, and the
18 peer's version information. I remember information about when we were
19 last connected to the peer too, even if we aren't currently connected.
21 @ivar announcement_time: when we first heard about this service
22 @ivar last_connect_time: when we last established a connection
23 @ivar last_loss_time: when we last lost a connection
25 @ivar version: the peer's version, from the most recent announcement
26 @ivar oldest_supported: the peer's oldest supported version, same
27 @ivar nickname: the peer's self-reported nickname, same
29 @ivar rref: the RemoteReference, if connected, otherwise None
30 @ivar remote_host: the IAddress, if connected, otherwise None
34 "storage": { "http://allmydata.org/tahoe/protocols/storage/v1" :
35 { "maximum-immutable-share-size": 2**32,
36 "tolerates-immutable-read-overrun": False,
37 "delete-mutable-shares-with-zero-length-writev": False,
39 "application-version": "unknown: no get_version()",
44 def __init__(self, announcement, tub, ic):
46 self._announcement = announcement
48 (furl, service_name, ri_name, nickname, ver, oldest) = announcement
51 m = re.match(r'pb://(\w+)@', furl)
53 self._nodeid = b32decode(m.group(1).upper())
54 self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
56 self.service_name = service_name
58 self.log("attempting to connect to %s" % self._nodeid_s)
59 self.announcement_time = time.time()
60 self.last_loss_time = None
62 self.remote_host = None
63 self.last_connect_time = None
65 self.oldest_supported = oldest
66 self.nickname = nickname
68 def log(self, *args, **kwargs):
69 return self._ic.log(*args, **kwargs)
71 def startConnecting(self):
72 self._reconnector = self._tub.connectTo(self._furl, self._got_service)
74 def stopConnecting(self):
75 self._reconnector.stopConnecting()
77 def _got_service(self, rref):
78 self.log("got connection to %s, getting versions" % self._nodeid_s)
80 default = self.VERSION_DEFAULTS.get(self.service_name, {})
81 d = get_versioned_remote_reference(rref, default)
82 d.addCallback(self._got_versioned_service)
84 def _got_versioned_service(self, rref):
85 self.log("connected to %s, version %s" % (self._nodeid_s, rref.version))
87 self.last_connect_time = time.time()
88 self.remote_host = rref.rref.tracker.broker.transport.getPeer()
92 self._ic.add_connection(self._nodeid, self.service_name, rref)
94 rref.notifyOnDisconnect(self._lost, rref)
96 def _lost(self, rref):
97 self.log("lost connection to %s" % self._nodeid_s)
98 self.last_loss_time = time.time()
100 self.remote_host = None
101 self._ic.remove_connection(self._nodeid, self.service_name, rref)
105 self._reconnector.reset()
108 class IntroducerClient(service.Service, Referenceable):
109 implements(RIIntroducerSubscriberClient, IIntroducerClient)
111 def __init__(self, tub, introducer_furl,
112 nickname, my_version, oldest_supported):
114 self.introducer_furl = introducer_furl
116 self._nickname = nickname.encode("utf-8")
117 self._my_version = my_version
118 self._oldest_supported = oldest_supported
120 self._published_announcements = set()
122 self._publisher = None
123 self._connected = False
125 self._subscribed_service_names = set()
126 self._subscriptions = set() # requests we've actually sent
127 self._received_announcements = set()
128 # TODO: this set will grow without bound, until the node is restarted
130 # we only accept one announcement per (peerid+service_name) pair.
131 # This insures that an upgraded host replace their previous
132 # announcement. It also means that each peer must have their own Tub
133 # (no sharing), which is slightly weird but consistent with the rest
134 # of the Tahoe codebase.
135 self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
136 # self._connections is a set of (peerid, service_name, rref) tuples
137 self._connections = set()
139 self.counter = 0 # incremented each time we change state, for tests
140 self.encoding_parameters = None
142 def startService(self):
143 service.Service.startService(self)
144 self._introducer_error = None
145 rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
146 self._introducer_reconnector = rc
147 def connect_failed(failure):
148 self.log("Initial Introducer connection failed: perhaps it's down",
149 level=log.WEIRD, failure=failure, umid="c5MqUQ")
150 d = self._tub.getReference(self.introducer_furl)
151 d.addErrback(connect_failed)
153 def _got_introducer(self, publisher):
154 self.log("connected to introducer, getting versions")
155 default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
157 "application-version": "unknown: no get_version()",
159 d = get_versioned_remote_reference(publisher, default)
160 d.addCallback(self._got_versioned_introducer)
161 d.addErrback(self._got_error)
163 def _got_error(self, f):
164 # TODO: for the introducer, perhaps this should halt the application
165 self._introducer_error = f # polled by tests
167 def _got_versioned_introducer(self, publisher):
168 self.log("got introducer version: %s" % (publisher.version,))
169 # we require a V1 introducer
170 needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
171 if needed not in publisher.version:
172 raise InsufficientVersionError(needed, publisher.version)
173 self._connected = True
174 self._publisher = publisher
175 publisher.notifyOnDisconnect(self._disconnected)
176 self._maybe_publish()
177 self._maybe_subscribe()
179 def _disconnected(self):
180 self.log("bummer, we've lost our connection to the introducer")
181 self._connected = False
182 self._publisher = None
183 self._subscriptions.clear()
185 def stopService(self):
186 service.Service.stopService(self)
187 self._introducer_reconnector.stopConnecting()
188 for rsc in self._connectors.itervalues():
191 def log(self, *args, **kwargs):
192 if "facility" not in kwargs:
193 kwargs["facility"] = "tahoe.introducer"
194 return log.msg(*args, **kwargs)
197 def publish(self, furl, service_name, remoteinterface_name):
198 ann = (furl, service_name, remoteinterface_name,
199 self._nickname, self._my_version, self._oldest_supported)
200 self._published_announcements.add(ann)
201 self._maybe_publish()
203 def subscribe_to(self, service_name):
204 self._subscribed_service_names.add(service_name)
205 self._maybe_subscribe()
207 def _maybe_subscribe(self):
208 if not self._publisher:
209 self.log("want to subscribe, but no introducer yet",
212 for service_name in self._subscribed_service_names:
213 if service_name not in self._subscriptions:
214 # there is a race here, but the subscription desk ignores
215 # duplicate requests.
216 self._subscriptions.add(service_name)
217 d = self._publisher.callRemote("subscribe", self, service_name)
218 d.addErrback(log.err, facility="tahoe.introducer",
219 level=log.WEIRD, umid="2uMScQ")
221 def _maybe_publish(self):
222 if not self._publisher:
223 self.log("want to publish, but no introducer yet", level=log.NOISY)
225 # this re-publishes everything. The Introducer ignores duplicates
226 for ann in self._published_announcements:
227 d = self._publisher.callRemote("publish", ann)
228 d.addErrback(log.err, facility="tahoe.introducer",
229 level=log.WEIRD, umid="xs9pVQ")
233 def remote_announce(self, announcements):
234 for ann in announcements:
235 self.log("received %d announcements" % len(announcements))
236 (furl, service_name, ri_name, nickname, ver, oldest) = ann
237 if service_name not in self._subscribed_service_names:
238 self.log("announcement for a service we don't care about [%s]"
239 % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
241 if ann in self._received_announcements:
242 self.log("ignoring old announcement: %s" % (ann,),
245 self.log("new announcement[%s]: %s" % (service_name, ann))
246 self._received_announcements.add(ann)
247 self._new_announcement(ann)
249 def _new_announcement(self, announcement):
250 # this will only be called for new announcements
251 index = make_index(announcement)
252 if index in self._connectors:
253 self.log("replacing earlier announcement", level=log.NOISY)
254 self._connectors[index].stopConnecting()
255 rsc = RemoteServiceConnector(announcement, self._tub, self)
256 self._connectors[index] = rsc
257 rsc.startConnecting()
259 def add_connection(self, nodeid, service_name, rref):
260 self._connections.add( (nodeid, service_name, rref) )
262 # when one connection is established, reset the timers on all others,
263 # to trigger a reconnection attempt in one second. This is intended
264 # to accelerate server connections when we've been offline for a
265 # while. The goal is to avoid hanging out for a long time with
266 # connections to only a subset of the servers, which would increase
267 # the chances that we'll put shares in weird places (and not update
268 # existing shares of mutable files). See #374 for more details.
269 for rsc in self._connectors.values():
272 def remove_connection(self, nodeid, service_name, rref):
273 self._connections.discard( (nodeid, service_name, rref) )
277 def get_all_connections(self):
278 return frozenset(self._connections)
280 def get_all_connectors(self):
281 return self._connectors.copy()
283 def get_all_peerids(self):
284 return frozenset([peerid
285 for (peerid, service_name, rref)
286 in self._connections])
288 def get_nickname_for_peerid(self, peerid):
289 for k in self._connectors:
290 (peerid0, svcname0) = k
291 if peerid0 == peerid:
292 rsc = self._connectors[k]
296 def get_all_connections_for(self, service_name):
298 for c in self._connections
299 if c[1] == service_name])
301 def get_peers(self, service_name):
302 """Return a set of (peerid, versioned-rref) tuples."""
303 return frozenset([(peerid, r) for (peerid, servname, r) in self._connections if servname == service_name])
305 def get_permuted_peers(self, service_name, key):
306 """Return an ordered list of (peerid, versioned-rref) tuples."""
308 servers = self.get_peers(service_name)
310 return sorted(servers, key=lambda x: sha.new(key+x[0]).digest())
312 def remote_set_encoding_parameters(self, parameters):
313 self.encoding_parameters = parameters
315 def connected_to_introducer(self):
316 return self._connected
318 def debug_disconnect_from_peerid(self, victim_nodeid):
319 # for unit tests: locate and sever all connections to the given
321 for (nodeid, service_name, rref) in self._connections:
322 if nodeid == victim_nodeid:
323 rref.tracker.broker.transport.loseConnection()