]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/introducer/client.py
introducer: move the relevant interfaces out to introducer/interfaces.py
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / introducer / client.py
1
2 import re, time, sha
3 from base64 import b32decode
4 from zope.interface import implements
5 from twisted.application import service
6 from foolscap import Referenceable
7 from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
8      IIntroducerClient
9 from allmydata.util import log, idlib
10 from allmydata.introducer.common import make_index
11
12
13 class RemoteServiceConnector:
14     """I hold information about a peer service that we want to connect to. If
15     we are connected, I hold the RemoteReference, the peer's address, and the
16     peer's version information. I remember information about when we were
17     last connected to the peer too, even if we aren't currently connected.
18
19     @ivar announcement_time: when we first heard about this service
20     @ivar last_connect_time: when we last established a connection
21     @ivar last_loss_time: when we last lost a connection
22
23     @ivar version: the peer's version, from the most recent announcement
24     @ivar oldest_supported: the peer's oldest supported version, same
25     @ivar nickname: the peer's self-reported nickname, same
26
27     @ivar rref: the RemoteReference, if connected, otherwise None
28     @ivar remote_host: the IAddress, if connected, otherwise None
29     """
30
31     def __init__(self, announcement, tub, ic):
32         self._tub = tub
33         self._announcement = announcement
34         self._ic = ic
35         (furl, service_name, ri_name, nickname, ver, oldest) = announcement
36
37         self._furl = furl
38         m = re.match(r'pb://(\w+)@', furl)
39         assert m
40         self._nodeid = b32decode(m.group(1).upper())
41         self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
42
43         self.service_name = service_name
44
45         self.log("attempting to connect to %s" % self._nodeid_s)
46         self.announcement_time = time.time()
47         self.last_loss_time = None
48         self.rref = None
49         self.remote_host = None
50         self.last_connect_time = None
51         self.version = ver
52         self.oldest_supported = oldest
53         self.nickname = nickname
54
55     def log(self, *args, **kwargs):
56         return self._ic.log(*args, **kwargs)
57
58     def startConnecting(self):
59         self._reconnector = self._tub.connectTo(self._furl, self._got_service)
60
61     def stopConnecting(self):
62         self._reconnector.stopConnecting()
63
64     def _got_service(self, rref):
65         self.last_connect_time = time.time()
66         self.remote_host = rref.tracker.broker.transport.getPeer()
67
68         self.rref = rref
69         self.log("connected to %s" % self._nodeid_s)
70
71         self._ic.add_connection(self._nodeid, self.service_name, rref)
72
73         rref.notifyOnDisconnect(self._lost, rref)
74
75     def _lost(self, rref):
76         self.log("lost connection to %s" % self._nodeid_s)
77         self.last_loss_time = time.time()
78         self.rref = None
79         self.remote_host = None
80         self._ic.remove_connection(self._nodeid, self.service_name, rref)
81
82     def reset(self):
83         self._reconnector.reset()
84
85
86 class IntroducerClient(service.Service, Referenceable):
87     implements(RIIntroducerSubscriberClient, IIntroducerClient)
88
89     def __init__(self, tub, introducer_furl,
90                  nickname, my_version, oldest_supported):
91         self._tub = tub
92         self.introducer_furl = introducer_furl
93
94         self._nickname = nickname
95         self._my_version = my_version
96         self._oldest_supported = oldest_supported
97
98         self._published_announcements = set()
99
100         self._publisher = None
101         self._connected = False
102
103         self._subscribed_service_names = set()
104         self._subscriptions = set() # requests we've actually sent
105         self._received_announcements = set()
106         # TODO: this set will grow without bound, until the node is restarted
107
108         # we only accept one announcement per (peerid+service_name) pair.
109         # This insures that an upgraded host replace their previous
110         # announcement. It also means that each peer must have their own Tub
111         # (no sharing), which is slightly weird but consistent with the rest
112         # of the Tahoe codebase.
113         self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
114         # self._connections is a set of (peerid, service_name, rref) tuples
115         self._connections = set()
116
117         self.counter = 0 # incremented each time we change state, for tests
118         self.encoding_parameters = None
119
120     def startService(self):
121         service.Service.startService(self)
122         rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
123         self._introducer_reconnector = rc
124         def connect_failed(failure):
125             self.log("Initial Introducer connection failed: perhaps it's down",
126                      level=log.WEIRD, failure=failure)
127         d = self._tub.getReference(self.introducer_furl)
128         d.addErrback(connect_failed)
129
130     def _got_introducer(self, publisher):
131         self.log("connected to introducer")
132         self._connected = True
133         self._publisher = publisher
134         publisher.notifyOnDisconnect(self._disconnected)
135         self._maybe_publish()
136         self._maybe_subscribe()
137
138     def _disconnected(self):
139         self.log("bummer, we've lost our connection to the introducer")
140         self._connected = False
141         self._publisher = None
142         self._subscriptions.clear()
143
144     def stopService(self):
145         service.Service.stopService(self)
146         self._introducer_reconnector.stopConnecting()
147         for rsc in self._connectors.itervalues():
148             rsc.stopConnecting()
149
150     def log(self, *args, **kwargs):
151         if "facility" not in kwargs:
152             kwargs["facility"] = "tahoe.introducer"
153         return log.msg(*args, **kwargs)
154
155
156     def publish(self, furl, service_name, remoteinterface_name):
157         ann = (furl, service_name, remoteinterface_name,
158                self._nickname, self._my_version, self._oldest_supported)
159         self._published_announcements.add(ann)
160         self._maybe_publish()
161
162     def subscribe_to(self, service_name):
163         self._subscribed_service_names.add(service_name)
164         self._maybe_subscribe()
165
166     def _maybe_subscribe(self):
167         if not self._publisher:
168             self.log("want to subscribe, but no introducer yet",
169                      level=log.NOISY)
170             return
171         for service_name in self._subscribed_service_names:
172             if service_name not in self._subscriptions:
173                 # there is a race here, but the subscription desk ignores
174                 # duplicate requests.
175                 self._subscriptions.add(service_name)
176                 d = self._publisher.callRemote("subscribe", self, service_name)
177                 d.addErrback(log.err, facility="tahoe.introducer",
178                              level=log.WEIRD)
179
180     def _maybe_publish(self):
181         if not self._publisher:
182             self.log("want to publish, but no introducer yet", level=log.NOISY)
183             return
184         # this re-publishes everything. The Introducer ignores duplicates
185         for ann in self._published_announcements:
186             d = self._publisher.callRemote("publish", ann)
187             d.addErrback(log.err, facility="tahoe.introducer",
188                          level=log.WEIRD)
189
190
191
192     def remote_announce(self, announcements):
193         for ann in announcements:
194             self.log("received %d announcements" % len(announcements))
195             (furl, service_name, ri_name, nickname, ver, oldest) = ann
196             if service_name not in self._subscribed_service_names:
197                 self.log("announcement for a service we don't care about [%s]"
198                          % (service_name,), level=log.WEIRD)
199                 continue
200             if ann in self._received_announcements:
201                 self.log("ignoring old announcement: %s" % (ann,),
202                          level=log.NOISY)
203                 continue
204             self.log("new announcement[%s]: %s" % (service_name, ann))
205             self._received_announcements.add(ann)
206             self._new_announcement(ann)
207
208     def _new_announcement(self, announcement):
209         # this will only be called for new announcements
210         index = make_index(announcement)
211         if index in self._connectors:
212             self.log("replacing earlier announcement", level=log.NOISY)
213             self._connectors[index].stopConnecting()
214         rsc = RemoteServiceConnector(announcement, self._tub, self)
215         self._connectors[index] = rsc
216         rsc.startConnecting()
217
218     def add_connection(self, nodeid, service_name, rref):
219         self._connections.add( (nodeid, service_name, rref) )
220         self.counter += 1
221         # when one connection is established, reset the timers on all others,
222         # to trigger a reconnection attempt in one second. This is intended
223         # to accelerate server connections when we've been offline for a
224         # while. The goal is to avoid hanging out for a long time with
225         # connections to only a subset of the servers, which would increase
226         # the chances that we'll put shares in weird places (and not update
227         # existing shares of mutable files). See #374 for more details.
228         for rsc in self._connectors.values():
229             rsc.reset()
230
231     def remove_connection(self, nodeid, service_name, rref):
232         self._connections.discard( (nodeid, service_name, rref) )
233         self.counter += 1
234
235
236     def get_all_connections(self):
237         return frozenset(self._connections)
238
239     def get_all_connectors(self):
240         return self._connectors.copy()
241
242     def get_all_peerids(self):
243         return frozenset([peerid
244                           for (peerid, service_name, rref)
245                           in self._connections])
246
247     def get_all_connections_for(self, service_name):
248         return frozenset([c
249                           for c in self._connections
250                           if c[1] == service_name])
251
252     def get_permuted_peers(self, service_name, key):
253         """Return an ordered list of (peerid, rref) tuples."""
254
255         results = []
256         for (c_peerid, c_service_name, rref) in self._connections:
257             assert isinstance(c_peerid, str)
258             if c_service_name != service_name:
259                 continue
260             permuted = sha.new(key + c_peerid).digest()
261             results.append((permuted, c_peerid, rref))
262
263         results.sort(lambda a,b: cmp(a[0], b[0]))
264         return [ (r[1], r[2]) for r in results ]
265
266
267
268     def remote_set_encoding_parameters(self, parameters):
269         self.encoding_parameters = parameters
270
271     def connected_to_introducer(self):
272         return self._connected
273
274     def debug_disconnect_from_peerid(self, victim_nodeid):
275         # for unit tests: locate and sever all connections to the given
276         # peerid.
277         for (nodeid, service_name, rref) in self._connections:
278             if nodeid == victim_nodeid:
279                 rref.tracker.broker.transport.loseConnection()