]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/introducer/old.py
remove introducer's set_encoding_parameters
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / introducer / old.py
1
2 import time
3 from base64 import b32decode
4 from zope.interface import implements, Interface
5 from twisted.application import service
6 import allmydata
7 from allmydata.interfaces import InsufficientVersionError
8 from allmydata.util import log, idlib, rrefutil
9 from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \
10     RemoteInterface, Referenceable, eventually, SturdyRef
11 from allmydata.introducer.common import SubscriberDescriptor, \
12      AnnouncementDescriptor
13 FURL = StringConstraint(1000)
14
15 # We keep a copy of the old introducer (both client and server) here to
16 # support compatibility tests. The old client is supposed to handle the new
17 # server, and new client is supposed to handle the old server.
18
19
20 # Announcements are (FURL, service_name, remoteinterface_name,
21 #                    nickname, my_version, oldest_supported)
22 #  the (FURL, service_name, remoteinterface_name) refer to the service being
23 #  announced. The (nickname, my_version, oldest_supported) refer to the
24 #  client as a whole. The my_version/oldest_supported strings can be parsed
25 #  by an allmydata.util.version.Version instance, and then compared. The
26 #  first goal is to make sure that nodes are not confused by speaking to an
27 #  incompatible peer. The second goal is to enable the development of
28 #  backwards-compatibility code.
29
30 Announcement = TupleOf(FURL, str, str,
31                        str, str, str)
32
33 class RIIntroducerSubscriberClient_v1(RemoteInterface):
34     __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com"
35
36     def announce(announcements=SetOf(Announcement)):
37         """I accept announcements from the publisher."""
38         return None
39
40 # When Foolscap can handle multiple interfaces (Foolscap#17), the
41 # full-powered introducer will implement both RIIntroducerPublisher and
42 # RIIntroducerSubscriberService. Until then, we define
43 # RIIntroducerPublisherAndSubscriberService as a combination of the two, and
44 # make everybody use that.
45
46 class RIIntroducerPublisher_v1(RemoteInterface):
47     """To publish a service to the world, connect to me and give me your
48     announcement message. I will deliver a copy to all connected subscribers."""
49     __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com"
50
51     def publish(announcement=Announcement):
52         # canary?
53         return None
54
55 class RIIntroducerSubscriberService_v1(RemoteInterface):
56     __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com"
57
58     def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
59         """Give me a subscriber reference, and I will call its new_peers()
60         method will any announcements that match the desired service name. I
61         will ignore duplicate subscriptions.
62         """
63         return None
64
65 class RIIntroducerPublisherAndSubscriberService_v1(RemoteInterface):
66     __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com"
67     def get_version():
68         return DictOf(str, Any())
69     def publish(announcement=Announcement):
70         return None
71     def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
72         return None
73
74 class IIntroducerClient(Interface):
75     """I provide service introduction facilities for a node. I help nodes
76     publish their services to the rest of the world, and I help them learn
77     about services available on other nodes."""
78
79     def publish(furl, service_name, remoteinterface_name):
80         """Once you call this, I will tell the world that the Referenceable
81         available at FURL is available to provide a service named
82         SERVICE_NAME. The precise definition of the service being provided is
83         identified by the Foolscap 'remote interface name' in the last
84         parameter: this is supposed to be a globally-unique string that
85         identifies the RemoteInterface that is implemented."""
86
87     def subscribe_to(service_name, callback, *args, **kwargs):
88         """Call this if you will eventually want to use services with the
89         given SERVICE_NAME. This will prompt me to subscribe to announcements
90         of those services. Your callback will be invoked with at least two
91         arguments: a serverid (binary string), and an announcement
92         dictionary, followed by any additional callback args/kwargs you give
93         me. I will run your callback for both new announcements and for
94         announcements that have changed, but you must be prepared to tolerate
95         duplicates.
96
97         The announcement dictionary that I give you will have the following
98         keys:
99
100          version: 0
101          service-name: str('storage')
102
103          FURL: str(furl)
104          remoteinterface-name: str(ri_name)
105          nickname: unicode
106          app-versions: {}
107          my-version: str
108          oldest-supported: str
109
110         Note that app-version will be an empty dictionary until #466 is done
111         and both the introducer and the remote client have been upgraded. For
112         current (native) server types, the serverid will always be equal to
113         the binary form of the FURL's tubid.
114         """
115
116     def connected_to_introducer():
117         """Returns a boolean, True if we are currently connected to the
118         introducer, False if not."""
119
120
121 class IntroducerClient_v1(service.Service, Referenceable):
122     implements(RIIntroducerSubscriberClient_v1, IIntroducerClient)
123
124     def __init__(self, tub, introducer_furl,
125                  nickname, my_version, oldest_supported):
126         self._tub = tub
127         self.introducer_furl = introducer_furl
128
129         assert type(nickname) is unicode
130         self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
131         self._my_version = my_version
132         self._oldest_supported = oldest_supported
133
134         self._published_announcements = set()
135
136         self._publisher = None
137
138         self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
139         self._subscribed_service_names = set()
140         self._subscriptions = set() # requests we've actually sent
141
142         # _current_announcements remembers one announcement per
143         # (servicename,serverid) pair. Anything that arrives with the same
144         # pair will displace the previous one. This stores unpacked
145         # announcement dictionaries, which can be compared for equality to
146         # distinguish re-announcement from updates. It also provides memory
147         # for clients who subscribe after startup.
148         self._current_announcements = {}
149
150         # hooks for unit tests
151         self._debug_counts = {
152             "inbound_message": 0,
153             "inbound_announcement": 0,
154             "wrong_service": 0,
155             "duplicate_announcement": 0,
156             "update": 0,
157             "new_announcement": 0,
158             "outbound_message": 0,
159             }
160         self._debug_outstanding = 0
161
162     def _debug_retired(self, res):
163         self._debug_outstanding -= 1
164         return res
165
166     def startService(self):
167         service.Service.startService(self)
168         self._introducer_error = None
169         rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
170         self._introducer_reconnector = rc
171         def connect_failed(failure):
172             self.log("Initial Introducer connection failed: perhaps it's down",
173                      level=log.WEIRD, failure=failure, umid="c5MqUQ")
174         d = self._tub.getReference(self.introducer_furl)
175         d.addErrback(connect_failed)
176
177     def _got_introducer(self, publisher):
178         self.log("connected to introducer, getting versions")
179         default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
180                     { },
181                     "application-version": "unknown: no get_version()",
182                     }
183         d = rrefutil.add_version_to_remote_reference(publisher, default)
184         d.addCallback(self._got_versioned_introducer)
185         d.addErrback(self._got_error)
186
187     def _got_error(self, f):
188         # TODO: for the introducer, perhaps this should halt the application
189         self._introducer_error = f # polled by tests
190
191     def _got_versioned_introducer(self, publisher):
192         self.log("got introducer version: %s" % (publisher.version,))
193         # we require a V1 introducer
194         needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
195         if needed not in publisher.version:
196             raise InsufficientVersionError(needed, publisher.version)
197         self._publisher = publisher
198         publisher.notifyOnDisconnect(self._disconnected)
199         self._maybe_publish()
200         self._maybe_subscribe()
201
202     def _disconnected(self):
203         self.log("bummer, we've lost our connection to the introducer")
204         self._publisher = None
205         self._subscriptions.clear()
206
207     def log(self, *args, **kwargs):
208         if "facility" not in kwargs:
209             kwargs["facility"] = "tahoe.introducer"
210         return log.msg(*args, **kwargs)
211
212
213     def publish(self, furl, service_name, remoteinterface_name):
214         assert type(self._nickname_utf8) is str # we always send UTF-8
215         ann = (furl, service_name, remoteinterface_name,
216                self._nickname_utf8, self._my_version, self._oldest_supported)
217         self._published_announcements.add(ann)
218         self._maybe_publish()
219
220     def subscribe_to(self, service_name, cb, *args, **kwargs):
221         self._local_subscribers.append( (service_name,cb,args,kwargs) )
222         self._subscribed_service_names.add(service_name)
223         self._maybe_subscribe()
224         for (servicename,nodeid),ann_d in self._current_announcements.items():
225             if servicename == service_name:
226                 eventually(cb, nodeid, ann_d)
227
228     def _maybe_subscribe(self):
229         if not self._publisher:
230             self.log("want to subscribe, but no introducer yet",
231                      level=log.NOISY)
232             return
233         for service_name in self._subscribed_service_names:
234             if service_name not in self._subscriptions:
235                 # there is a race here, but the subscription desk ignores
236                 # duplicate requests.
237                 self._subscriptions.add(service_name)
238                 self._debug_outstanding += 1
239                 d = self._publisher.callRemote("subscribe", self, service_name)
240                 d.addBoth(self._debug_retired)
241                 d.addErrback(rrefutil.trap_deadref)
242                 d.addErrback(log.err, format="server errored during subscribe",
243                              facility="tahoe.introducer",
244                              level=log.WEIRD, umid="2uMScQ")
245
246     def _maybe_publish(self):
247         if not self._publisher:
248             self.log("want to publish, but no introducer yet", level=log.NOISY)
249             return
250         # this re-publishes everything. The Introducer ignores duplicates
251         for ann in self._published_announcements:
252             self._debug_counts["outbound_message"] += 1
253             self._debug_outstanding += 1
254             d = self._publisher.callRemote("publish", ann)
255             d.addBoth(self._debug_retired)
256             d.addErrback(rrefutil.trap_deadref)
257             d.addErrback(log.err,
258                          format="server errored during publish %(ann)s",
259                          ann=ann, facility="tahoe.introducer",
260                          level=log.WEIRD, umid="xs9pVQ")
261
262
263
264     def remote_announce(self, announcements):
265         self.log("received %d announcements" % len(announcements))
266         self._debug_counts["inbound_message"] += 1
267         for ann in announcements:
268             try:
269                 self._process_announcement(ann)
270             except:
271                 log.err(format="unable to process announcement %(ann)s",
272                         ann=ann)
273                 # Don't let a corrupt announcement prevent us from processing
274                 # the remaining ones. Don't return an error to the server,
275                 # since they'd just ignore it anyways.
276                 pass
277
278     def _process_announcement(self, ann):
279         self._debug_counts["inbound_announcement"] += 1
280         (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
281         if service_name not in self._subscribed_service_names:
282             self.log("announcement for a service we don't care about [%s]"
283                      % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
284             self._debug_counts["wrong_service"] += 1
285             return
286         self.log("announcement for [%s]: %s" % (service_name, ann),
287                  umid="BoKEag")
288         assert type(furl) is str
289         assert type(service_name) is str
290         assert type(ri_name) is str
291         assert type(nickname_utf8) is str
292         nickname = nickname_utf8.decode("utf-8")
293         assert type(nickname) is unicode
294         assert type(ver) is str
295         assert type(oldest) is str
296
297         nodeid = b32decode(SturdyRef(furl).tubID.upper())
298         nodeid_s = idlib.shortnodeid_b2a(nodeid)
299
300         ann_d = { "version": 0,
301                   "service-name": service_name,
302
303                   "FURL": furl,
304                   "nickname": nickname,
305                   "app-versions": {}, # need #466 and v2 introducer
306                   "my-version": ver,
307                   "oldest-supported": oldest,
308                   }
309
310         index = (service_name, nodeid)
311         if self._current_announcements.get(index, None) == ann_d:
312             self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
313                      service=service_name, nodeid=nodeid_s,
314                      level=log.UNUSUAL, umid="B1MIdA")
315             self._debug_counts["duplicate_announcement"] += 1
316             return
317         if index in self._current_announcements:
318             self._debug_counts["update"] += 1
319         else:
320             self._debug_counts["new_announcement"] += 1
321
322         self._current_announcements[index] = ann_d
323         # note: we never forget an index, but we might update its value
324
325         for (service_name2,cb,args,kwargs) in self._local_subscribers:
326             if service_name2 == service_name:
327                 eventually(cb, nodeid, ann_d, *args, **kwargs)
328
329     def connected_to_introducer(self):
330         return bool(self._publisher)
331
332 class IntroducerService_v1(service.MultiService, Referenceable):
333     implements(RIIntroducerPublisherAndSubscriberService_v1)
334     name = "introducer"
335     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1":
336                  { },
337                 "application-version": str(allmydata.__full_version__),
338                 }
339
340     def __init__(self, basedir="."):
341         service.MultiService.__init__(self)
342         self.introducer_url = None
343         # 'index' is (service_name, tubid)
344         self._announcements = {} # dict of index -> (announcement, timestamp)
345         self._subscribers = {} # [service_name]->[rref]->timestamp
346         self._debug_counts = {"inbound_message": 0,
347                               "inbound_duplicate": 0,
348                               "inbound_update": 0,
349                               "outbound_message": 0,
350                               "outbound_announcements": 0,
351                               "inbound_subscribe": 0}
352         self._debug_outstanding = 0
353
354     def _debug_retired(self, res):
355         self._debug_outstanding -= 1
356         return res
357
358     def log(self, *args, **kwargs):
359         if "facility" not in kwargs:
360             kwargs["facility"] = "tahoe.introducer"
361         return log.msg(*args, **kwargs)
362
363     def get_announcements(self, include_stub_clients=True):
364         announcements = []
365         for index, (ann_t, when) in self._announcements.items():
366             (furl, service_name, ri_name, nickname, ver, oldest) = ann_t
367             if service_name == "stub_client" and not include_stub_clients:
368                 continue
369             ann_d = {"nickname": nickname.decode("utf-8", "replace"),
370                      "my-version": ver,
371                      "service-name": service_name,
372                      "anonymous-storage-FURL": furl,
373                      }
374             # the V2 introducer uses (service_name, key_s, tubid_s) as an
375             # index, so match that format for AnnouncementDescriptor
376             new_index = (index[0], None, idlib.nodeid_b2a(index[1]))
377             ad = AnnouncementDescriptor(when, new_index, None, ann_d)
378             announcements.append(ad)
379         return announcements
380
381     def get_subscribers(self):
382         s = []
383         for service_name, subscribers in self._subscribers.items():
384             for rref, when in subscribers.items():
385                 tubid = rref.getRemoteTubID() or "?"
386                 remote_address = rrefutil.stringify_remote_address(rref)
387                 nickname, version, app_versions = u"?", u"?", {}
388                 sd = SubscriberDescriptor(service_name, when,
389                                           nickname, version, app_versions,
390                                           remote_address, tubid)
391                 s.append(sd)
392         return s
393
394     def remote_get_version(self):
395         return self.VERSION
396
397     def remote_publish(self, announcement):
398         try:
399             self._publish(announcement)
400         except:
401             log.err(format="Introducer.remote_publish failed on %(ann)s",
402                     ann=announcement, level=log.UNUSUAL, umid="620rWA")
403             raise
404
405     def _publish(self, announcement):
406         self._debug_counts["inbound_message"] += 1
407         self.log("introducer: announcement published: %s" % (announcement,) )
408         (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
409         #print "PUB", service_name, nickname_utf8
410
411         nodeid = b32decode(SturdyRef(furl).tubID.upper())
412         index = (service_name, nodeid)
413
414         if index in self._announcements:
415             (old_announcement, timestamp) = self._announcements[index]
416             if old_announcement == announcement:
417                 self.log("but we already knew it, ignoring", level=log.NOISY)
418                 self._debug_counts["inbound_duplicate"] += 1
419                 return
420             else:
421                 self.log("old announcement being updated", level=log.NOISY)
422                 self._debug_counts["inbound_update"] += 1
423         self._announcements[index] = (announcement, time.time())
424
425         for s in self._subscribers.get(service_name, []):
426             self._debug_counts["outbound_message"] += 1
427             self._debug_counts["outbound_announcements"] += 1
428             self._debug_outstanding += 1
429             d = s.callRemote("announce", set([announcement]))
430             d.addBoth(self._debug_retired)
431             d.addErrback(rrefutil.trap_deadref)
432             d.addErrback(log.err,
433                          format="subscriber errored on announcement %(ann)s",
434                          ann=announcement, facility="tahoe.introducer",
435                          level=log.UNUSUAL, umid="jfGMXQ")
436
437     def remote_subscribe(self, subscriber, service_name):
438         self.log("introducer: subscription[%s] request at %s" % (service_name,
439                                                                  subscriber))
440         self._debug_counts["inbound_subscribe"] += 1
441         if service_name not in self._subscribers:
442             self._subscribers[service_name] = {}
443         subscribers = self._subscribers[service_name]
444         if subscriber in subscribers:
445             self.log("but they're already subscribed, ignoring",
446                      level=log.UNUSUAL)
447             return
448         subscribers[subscriber] = time.time()
449         def _remove():
450             self.log("introducer: unsubscribing[%s] %s" % (service_name,
451                                                            subscriber))
452             subscribers.pop(subscriber, None)
453         subscriber.notifyOnDisconnect(_remove)
454
455         announcements = set(
456             [ ann
457               for (sn2,nodeid),(ann,when) in self._announcements.items()
458               if sn2 == service_name] )
459
460         self._debug_counts["outbound_message"] += 1
461         self._debug_counts["outbound_announcements"] += len(announcements)
462         self._debug_outstanding += 1
463         d = subscriber.callRemote("announce", announcements)
464         d.addBoth(self._debug_retired)
465         d.addErrback(rrefutil.trap_deadref)
466         d.addErrback(log.err,
467                      format="subscriber errored during subscribe %(anns)s",
468                      anns=announcements, facility="tahoe.introducer",
469                      level=log.UNUSUAL, umid="1XChxA")