]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/introducer/old.py
introducer: move the relevant interfaces out to introducer/interfaces.py
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / introducer / old.py
1
2 # We keep a copy of the old introducer (both client and server) here to
3 # support compatibility tests. The old client is supposed to handle the new
4 # server, and new client is supposed to handle the old server.
5
6 import re, time, sha
7 from base64 import b32decode
8 from zope.interface import implements
9 from twisted.application import service
10 from foolscap import Referenceable
11 from allmydata.util import log, idlib
12 from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
13      IIntroducerClient, RIIntroducerPublisherAndSubscriberService
14 from allmydata.introducer.common import make_index
15
16 class RemoteServiceConnector:
17     """I hold information about a peer service that we want to connect to. If
18     we are connected, I hold the RemoteReference, the peer's address, and the
19     peer's version information. I remember information about when we were
20     last connected to the peer too, even if we aren't currently connected.
21
22     @ivar announcement_time: when we first heard about this service
23     @ivar last_connect_time: when we last established a connection
24     @ivar last_loss_time: when we last lost a connection
25
26     @ivar version: the peer's version, from the most recent announcement
27     @ivar oldest_supported: the peer's oldest supported version, same
28     @ivar nickname: the peer's self-reported nickname, same
29
30     @ivar rref: the RemoteReference, if connected, otherwise None
31     @ivar remote_host: the IAddress, if connected, otherwise None
32     """
33
34     def __init__(self, announcement, tub, ic):
35         self._tub = tub
36         self._announcement = announcement
37         self._ic = ic
38         (furl, service_name, ri_name, nickname, ver, oldest) = announcement
39
40         self._furl = furl
41         m = re.match(r'pb://(\w+)@', furl)
42         assert m
43         self._nodeid = b32decode(m.group(1).upper())
44         self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
45
46         self.service_name = service_name
47
48         self.log("attempting to connect to %s" % self._nodeid_s)
49         self.announcement_time = time.time()
50         self.last_loss_time = None
51         self.rref = None
52         self.remote_host = None
53         self.last_connect_time = None
54         self.version = ver
55         self.oldest_supported = oldest
56         self.nickname = nickname
57
58     def log(self, *args, **kwargs):
59         return self._ic.log(*args, **kwargs)
60
61     def startConnecting(self):
62         self._reconnector = self._tub.connectTo(self._furl, self._got_service)
63
64     def stopConnecting(self):
65         self._reconnector.stopConnecting()
66
67     def _got_service(self, rref):
68         self.last_connect_time = time.time()
69         self.remote_host = rref.tracker.broker.transport.getPeer()
70
71         self.rref = rref
72         self.log("connected to %s" % self._nodeid_s)
73
74         self._ic.add_connection(self._nodeid, self.service_name, rref)
75
76         rref.notifyOnDisconnect(self._lost, rref)
77
78     def _lost(self, rref):
79         self.log("lost connection to %s" % self._nodeid_s)
80         self.last_loss_time = time.time()
81         self.rref = None
82         self.remote_host = None
83         self._ic.remove_connection(self._nodeid, self.service_name, rref)
84
85     def reset(self):
86         self._reconnector.reset()
87
88
89 class IntroducerClient_V1(service.Service, Referenceable):
90     implements(RIIntroducerSubscriberClient, IIntroducerClient)
91
92     def __init__(self, tub, introducer_furl,
93                  nickname, my_version, oldest_supported):
94         self._tub = tub
95         self.introducer_furl = introducer_furl
96
97         self._nickname = nickname
98         self._my_version = my_version
99         self._oldest_supported = oldest_supported
100
101         self._published_announcements = set()
102
103         self._publisher = None
104         self._connected = False
105
106         self._subscribed_service_names = set()
107         self._subscriptions = set() # requests we've actually sent
108         self._received_announcements = set()
109         # TODO: this set will grow without bound, until the node is restarted
110
111         # we only accept one announcement per (peerid+service_name) pair.
112         # This insures that an upgraded host replace their previous
113         # announcement. It also means that each peer must have their own Tub
114         # (no sharing), which is slightly weird but consistent with the rest
115         # of the Tahoe codebase.
116         self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
117         # self._connections is a set of (peerid, service_name, rref) tuples
118         self._connections = set()
119
120         self.counter = 0 # incremented each time we change state, for tests
121         self.encoding_parameters = None
122
123     def startService(self):
124         service.Service.startService(self)
125         rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
126         self._introducer_reconnector = rc
127         def connect_failed(failure):
128             self.log("Initial Introducer connection failed: perhaps it's down",
129                      level=log.WEIRD, failure=failure)
130         d = self._tub.getReference(self.introducer_furl)
131         d.addErrback(connect_failed)
132
133     def _got_introducer(self, publisher):
134         self.log("connected to introducer")
135         self._connected = True
136         self._publisher = publisher
137         publisher.notifyOnDisconnect(self._disconnected)
138         self._maybe_publish()
139         self._maybe_subscribe()
140
141     def _disconnected(self):
142         self.log("bummer, we've lost our connection to the introducer")
143         self._connected = False
144         self._publisher = None
145         self._subscriptions.clear()
146
147     def stopService(self):
148         service.Service.stopService(self)
149         self._introducer_reconnector.stopConnecting()
150         for rsc in self._connectors.itervalues():
151             rsc.stopConnecting()
152
153     def log(self, *args, **kwargs):
154         if "facility" not in kwargs:
155             kwargs["facility"] = "tahoe.introducer"
156         return log.msg(*args, **kwargs)
157
158
159     def publish(self, furl, service_name, remoteinterface_name):
160         ann = (furl, service_name, remoteinterface_name,
161                self._nickname, self._my_version, self._oldest_supported)
162         self._published_announcements.add(ann)
163         self._maybe_publish()
164
165     def subscribe_to(self, service_name):
166         self._subscribed_service_names.add(service_name)
167         self._maybe_subscribe()
168
169     def _maybe_subscribe(self):
170         if not self._publisher:
171             self.log("want to subscribe, but no introducer yet",
172                      level=log.NOISY)
173             return
174         for service_name in self._subscribed_service_names:
175             if service_name not in self._subscriptions:
176                 # there is a race here, but the subscription desk ignores
177                 # duplicate requests.
178                 self._subscriptions.add(service_name)
179                 d = self._publisher.callRemote("subscribe", self, service_name)
180                 d.addErrback(log.err, facility="tahoe.introducer",
181                              level=log.WEIRD)
182
183     def _maybe_publish(self):
184         if not self._publisher:
185             self.log("want to publish, but no introducer yet", level=log.NOISY)
186             return
187         # this re-publishes everything. The Introducer ignores duplicates
188         for ann in self._published_announcements:
189             d = self._publisher.callRemote("publish", ann)
190             d.addErrback(log.err, facility="tahoe.introducer",
191                          level=log.WEIRD)
192
193
194
195     def remote_announce(self, announcements):
196         for ann in announcements:
197             self.log("received %d announcements" % len(announcements))
198             (furl, service_name, ri_name, nickname, ver, oldest) = ann
199             if service_name not in self._subscribed_service_names:
200                 self.log("announcement for a service we don't care about [%s]"
201                          % (service_name,), level=log.WEIRD)
202                 continue
203             if ann in self._received_announcements:
204                 self.log("ignoring old announcement: %s" % (ann,),
205                          level=log.NOISY)
206                 continue
207             self.log("new announcement[%s]: %s" % (service_name, ann))
208             self._received_announcements.add(ann)
209             self._new_announcement(ann)
210
211     def _new_announcement(self, announcement):
212         # this will only be called for new announcements
213         index = make_index(announcement)
214         if index in self._connectors:
215             self.log("replacing earlier announcement", level=log.NOISY)
216             self._connectors[index].stopConnecting()
217         rsc = RemoteServiceConnector(announcement, self._tub, self)
218         self._connectors[index] = rsc
219         rsc.startConnecting()
220
221     def add_connection(self, nodeid, service_name, rref):
222         self._connections.add( (nodeid, service_name, rref) )
223         self.counter += 1
224         # when one connection is established, reset the timers on all others,
225         # to trigger a reconnection attempt in one second. This is intended
226         # to accelerate server connections when we've been offline for a
227         # while. The goal is to avoid hanging out for a long time with
228         # connections to only a subset of the servers, which would increase
229         # the chances that we'll put shares in weird places (and not update
230         # existing shares of mutable files). See #374 for more details.
231         for rsc in self._connectors.values():
232             rsc.reset()
233
234     def remove_connection(self, nodeid, service_name, rref):
235         self._connections.discard( (nodeid, service_name, rref) )
236         self.counter += 1
237
238
239     def get_all_connections(self):
240         return frozenset(self._connections)
241
242     def get_all_connectors(self):
243         return self._connectors.copy()
244
245     def get_all_peerids(self):
246         return frozenset([peerid
247                           for (peerid, service_name, rref)
248                           in self._connections])
249
250     def get_all_connections_for(self, service_name):
251         return frozenset([c
252                           for c in self._connections
253                           if c[1] == service_name])
254
255     def get_permuted_peers(self, service_name, key):
256         """Return an ordered list of (peerid, rref) tuples."""
257
258         results = []
259         for (c_peerid, c_service_name, rref) in self._connections:
260             assert isinstance(c_peerid, str)
261             if c_service_name != service_name:
262                 continue
263             permuted = sha.new(key + c_peerid).digest()
264             results.append((permuted, c_peerid, rref))
265
266         results.sort(lambda a,b: cmp(a[0], b[0]))
267         return [ (r[1], r[2]) for r in results ]
268
269
270
271     def remote_set_encoding_parameters(self, parameters):
272         self.encoding_parameters = parameters
273
274     def connected_to_introducer(self):
275         return self._connected
276
277     def debug_disconnect_from_peerid(self, victim_nodeid):
278         # for unit tests: locate and sever all connections to the given
279         # peerid.
280         for (nodeid, service_name, rref) in self._connections:
281             if nodeid == victim_nodeid:
282                 rref.tracker.broker.transport.loseConnection()
283
284
285 class IntroducerService_V1(service.MultiService, Referenceable):
286     implements(RIIntroducerPublisherAndSubscriberService)
287     name = "introducer"
288
289     def __init__(self, basedir="."):
290         service.MultiService.__init__(self)
291         self.introducer_url = None
292         # 'index' is (tubid, service_name)
293         self._announcements = {} # dict of index -> (announcement, timestamp)
294         self._subscribers = {} # dict of (rref->timestamp) dicts
295
296     def log(self, *args, **kwargs):
297         if "facility" not in kwargs:
298             kwargs["facility"] = "tahoe.introducer"
299         return log.msg(*args, **kwargs)
300
301     def get_announcements(self):
302         return self._announcements
303     def get_subscribers(self):
304         return self._subscribers
305
306     def remote_publish(self, announcement):
307         self.log("introducer: announcement published: %s" % (announcement,) )
308         index = make_index(announcement)
309         if index in self._announcements:
310             (old_announcement, timestamp) = self._announcements[index]
311             if old_announcement == announcement:
312                 self.log("but we already knew it, ignoring", level=log.NOISY)
313                 return
314             else:
315                 self.log("old announcement being updated", level=log.NOISY)
316         self._announcements[index] = (announcement, time.time())
317         (furl, service_name, ri_name, nickname, ver, oldest) = announcement
318         for s in self._subscribers.get(service_name, []):
319             s.callRemote("announce", set([announcement]))
320
321     def remote_subscribe(self, subscriber, service_name):
322         self.log("introducer: subscription[%s] request at %s" % (service_name,
323                                                                  subscriber))
324         if service_name not in self._subscribers:
325             self._subscribers[service_name] = {}
326         subscribers = self._subscribers[service_name]
327         if subscriber in subscribers:
328             self.log("but they're already subscribed, ignoring",
329                      level=log.UNUSUAL)
330             return
331         subscribers[subscriber] = time.time()
332         def _remove():
333             self.log("introducer: unsubscribing[%s] %s" % (service_name,
334                                                            subscriber))
335             subscribers.pop(subscriber, None)
336         subscriber.notifyOnDisconnect(_remove)
337
338         announcements = set( [ ann
339                                for idx,(ann,when) in self._announcements.items()
340                                if idx[1] == service_name] )
341         d = subscriber.callRemote("announce", announcements)
342         d.addErrback(log.err, facility="tahoe.introducer", level=log.UNUSUAL)