]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/introducer/old.py
new introducer: signed extensible dictionary-based messages! refs #466
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / introducer / old.py
1
2 import time
3 from base64 import b32decode
4 from zope.interface import implements, Interface
5 from twisted.application import service
6 import allmydata
7 from allmydata.interfaces import InsufficientVersionError
8 from allmydata.util import log, idlib, rrefutil
9 from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \
10     RemoteInterface, Referenceable, eventually, SturdyRef
11 FURL = StringConstraint(1000)
12
13 # We keep a copy of the old introducer (both client and server) here to
14 # support compatibility tests. The old client is supposed to handle the new
15 # server, and new client is supposed to handle the old server.
16
17
18 # Announcements are (FURL, service_name, remoteinterface_name,
19 #                    nickname, my_version, oldest_supported)
20 #  the (FURL, service_name, remoteinterface_name) refer to the service being
21 #  announced. The (nickname, my_version, oldest_supported) refer to the
22 #  client as a whole. The my_version/oldest_supported strings can be parsed
23 #  by an allmydata.util.version.Version instance, and then compared. The
24 #  first goal is to make sure that nodes are not confused by speaking to an
25 #  incompatible peer. The second goal is to enable the development of
26 #  backwards-compatibility code.
27
28 Announcement = TupleOf(FURL, str, str,
29                        str, str, str)
30
31 class RIIntroducerSubscriberClient_v1(RemoteInterface):
32     __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com"
33
34     def announce(announcements=SetOf(Announcement)):
35         """I accept announcements from the publisher."""
36         return None
37
38     def set_encoding_parameters(parameters=(int, int, int)):
39         """Advise the client of the recommended k-of-n encoding parameters
40         for this grid. 'parameters' is a tuple of (k, desired, n), where 'n'
41         is the total number of shares that will be created for any given
42         file, while 'k' is the number of shares that must be retrieved to
43         recover that file, and 'desired' is the minimum number of shares that
44         must be placed before the uploader will consider its job a success.
45         n/k is the expansion ratio, while k determines the robustness.
46
47         Introducers should specify 'n' according to the expected size of the
48         grid (there is no point to producing more shares than there are
49         peers), and k according to the desired reliability-vs-overhead goals.
50
51         Note that setting k=1 is equivalent to simple replication.
52         """
53         return None
54
55 # When Foolscap can handle multiple interfaces (Foolscap#17), the
56 # full-powered introducer will implement both RIIntroducerPublisher and
57 # RIIntroducerSubscriberService. Until then, we define
58 # RIIntroducerPublisherAndSubscriberService as a combination of the two, and
59 # make everybody use that.
60
61 class RIIntroducerPublisher_v1(RemoteInterface):
62     """To publish a service to the world, connect to me and give me your
63     announcement message. I will deliver a copy to all connected subscribers."""
64     __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com"
65
66     def publish(announcement=Announcement):
67         # canary?
68         return None
69
70 class RIIntroducerSubscriberService_v1(RemoteInterface):
71     __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com"
72
73     def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
74         """Give me a subscriber reference, and I will call its new_peers()
75         method will any announcements that match the desired service name. I
76         will ignore duplicate subscriptions.
77         """
78         return None
79
80 class RIIntroducerPublisherAndSubscriberService_v1(RemoteInterface):
81     __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com"
82     def get_version():
83         return DictOf(str, Any())
84     def publish(announcement=Announcement):
85         return None
86     def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
87         return None
88
89 class IIntroducerClient(Interface):
90     """I provide service introduction facilities for a node. I help nodes
91     publish their services to the rest of the world, and I help them learn
92     about services available on other nodes."""
93
94     def publish(furl, service_name, remoteinterface_name):
95         """Once you call this, I will tell the world that the Referenceable
96         available at FURL is available to provide a service named
97         SERVICE_NAME. The precise definition of the service being provided is
98         identified by the Foolscap 'remote interface name' in the last
99         parameter: this is supposed to be a globally-unique string that
100         identifies the RemoteInterface that is implemented."""
101
102     def subscribe_to(service_name, callback, *args, **kwargs):
103         """Call this if you will eventually want to use services with the
104         given SERVICE_NAME. This will prompt me to subscribe to announcements
105         of those services. Your callback will be invoked with at least two
106         arguments: a serverid (binary string), and an announcement
107         dictionary, followed by any additional callback args/kwargs you give
108         me. I will run your callback for both new announcements and for
109         announcements that have changed, but you must be prepared to tolerate
110         duplicates.
111
112         The announcement dictionary that I give you will have the following
113         keys:
114
115          version: 0
116          service-name: str('storage')
117
118          FURL: str(furl)
119          remoteinterface-name: str(ri_name)
120          nickname: unicode
121          app-versions: {}
122          my-version: str
123          oldest-supported: str
124
125         Note that app-version will be an empty dictionary until #466 is done
126         and both the introducer and the remote client have been upgraded. For
127         current (native) server types, the serverid will always be equal to
128         the binary form of the FURL's tubid.
129         """
130
131     def connected_to_introducer():
132         """Returns a boolean, True if we are currently connected to the
133         introducer, False if not."""
134
135
136 class IntroducerClient_v1(service.Service, Referenceable):
137     implements(RIIntroducerSubscriberClient_v1, IIntroducerClient)
138
139     def __init__(self, tub, introducer_furl,
140                  nickname, my_version, oldest_supported):
141         self._tub = tub
142         self.introducer_furl = introducer_furl
143
144         assert type(nickname) is unicode
145         self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
146         self._my_version = my_version
147         self._oldest_supported = oldest_supported
148
149         self._published_announcements = set()
150
151         self._publisher = None
152
153         self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
154         self._subscribed_service_names = set()
155         self._subscriptions = set() # requests we've actually sent
156
157         # _current_announcements remembers one announcement per
158         # (servicename,serverid) pair. Anything that arrives with the same
159         # pair will displace the previous one. This stores unpacked
160         # announcement dictionaries, which can be compared for equality to
161         # distinguish re-announcement from updates. It also provides memory
162         # for clients who subscribe after startup.
163         self._current_announcements = {}
164
165         self.encoding_parameters = None
166
167         # hooks for unit tests
168         self._debug_counts = {
169             "inbound_message": 0,
170             "inbound_announcement": 0,
171             "wrong_service": 0,
172             "duplicate_announcement": 0,
173             "update": 0,
174             "new_announcement": 0,
175             "outbound_message": 0,
176             }
177         self._debug_outstanding = 0
178
179     def _debug_retired(self, res):
180         self._debug_outstanding -= 1
181         return res
182
183     def startService(self):
184         service.Service.startService(self)
185         self._introducer_error = None
186         rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
187         self._introducer_reconnector = rc
188         def connect_failed(failure):
189             self.log("Initial Introducer connection failed: perhaps it's down",
190                      level=log.WEIRD, failure=failure, umid="c5MqUQ")
191         d = self._tub.getReference(self.introducer_furl)
192         d.addErrback(connect_failed)
193
194     def _got_introducer(self, publisher):
195         self.log("connected to introducer, getting versions")
196         default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
197                     { },
198                     "application-version": "unknown: no get_version()",
199                     }
200         d = rrefutil.add_version_to_remote_reference(publisher, default)
201         d.addCallback(self._got_versioned_introducer)
202         d.addErrback(self._got_error)
203
204     def _got_error(self, f):
205         # TODO: for the introducer, perhaps this should halt the application
206         self._introducer_error = f # polled by tests
207
208     def _got_versioned_introducer(self, publisher):
209         self.log("got introducer version: %s" % (publisher.version,))
210         # we require a V1 introducer
211         needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
212         if needed not in publisher.version:
213             raise InsufficientVersionError(needed, publisher.version)
214         self._publisher = publisher
215         publisher.notifyOnDisconnect(self._disconnected)
216         self._maybe_publish()
217         self._maybe_subscribe()
218
219     def _disconnected(self):
220         self.log("bummer, we've lost our connection to the introducer")
221         self._publisher = None
222         self._subscriptions.clear()
223
224     def log(self, *args, **kwargs):
225         if "facility" not in kwargs:
226             kwargs["facility"] = "tahoe.introducer"
227         return log.msg(*args, **kwargs)
228
229
230     def publish(self, furl, service_name, remoteinterface_name):
231         assert type(self._nickname_utf8) is str # we always send UTF-8
232         ann = (furl, service_name, remoteinterface_name,
233                self._nickname_utf8, self._my_version, self._oldest_supported)
234         self._published_announcements.add(ann)
235         self._maybe_publish()
236
237     def subscribe_to(self, service_name, cb, *args, **kwargs):
238         self._local_subscribers.append( (service_name,cb,args,kwargs) )
239         self._subscribed_service_names.add(service_name)
240         self._maybe_subscribe()
241         for (servicename,nodeid),ann_d in self._current_announcements.items():
242             if servicename == service_name:
243                 eventually(cb, nodeid, ann_d)
244
245     def _maybe_subscribe(self):
246         if not self._publisher:
247             self.log("want to subscribe, but no introducer yet",
248                      level=log.NOISY)
249             return
250         for service_name in self._subscribed_service_names:
251             if service_name not in self._subscriptions:
252                 # there is a race here, but the subscription desk ignores
253                 # duplicate requests.
254                 self._subscriptions.add(service_name)
255                 self._debug_outstanding += 1
256                 d = self._publisher.callRemote("subscribe", self, service_name)
257                 d.addBoth(self._debug_retired)
258                 d.addErrback(rrefutil.trap_deadref)
259                 d.addErrback(log.err, format="server errored during subscribe",
260                              facility="tahoe.introducer",
261                              level=log.WEIRD, umid="2uMScQ")
262
263     def _maybe_publish(self):
264         if not self._publisher:
265             self.log("want to publish, but no introducer yet", level=log.NOISY)
266             return
267         # this re-publishes everything. The Introducer ignores duplicates
268         for ann in self._published_announcements:
269             self._debug_counts["outbound_message"] += 1
270             self._debug_outstanding += 1
271             d = self._publisher.callRemote("publish", ann)
272             d.addBoth(self._debug_retired)
273             d.addErrback(rrefutil.trap_deadref)
274             d.addErrback(log.err,
275                          format="server errored during publish %(ann)s",
276                          ann=ann, facility="tahoe.introducer",
277                          level=log.WEIRD, umid="xs9pVQ")
278
279
280
281     def remote_announce(self, announcements):
282         self.log("received %d announcements" % len(announcements))
283         self._debug_counts["inbound_message"] += 1
284         for ann in announcements:
285             try:
286                 self._process_announcement(ann)
287             except:
288                 log.err(format="unable to process announcement %(ann)s",
289                         ann=ann)
290                 # Don't let a corrupt announcement prevent us from processing
291                 # the remaining ones. Don't return an error to the server,
292                 # since they'd just ignore it anyways.
293                 pass
294
295     def _process_announcement(self, ann):
296         self._debug_counts["inbound_announcement"] += 1
297         (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
298         if service_name not in self._subscribed_service_names:
299             self.log("announcement for a service we don't care about [%s]"
300                      % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
301             self._debug_counts["wrong_service"] += 1
302             return
303         self.log("announcement for [%s]: %s" % (service_name, ann),
304                  umid="BoKEag")
305         assert type(furl) is str
306         assert type(service_name) is str
307         assert type(ri_name) is str
308         assert type(nickname_utf8) is str
309         nickname = nickname_utf8.decode("utf-8")
310         assert type(nickname) is unicode
311         assert type(ver) is str
312         assert type(oldest) is str
313
314         nodeid = b32decode(SturdyRef(furl).tubID.upper())
315         nodeid_s = idlib.shortnodeid_b2a(nodeid)
316
317         ann_d = { "version": 0,
318                   "service-name": service_name,
319
320                   "FURL": furl,
321                   "nickname": nickname,
322                   "app-versions": {}, # need #466 and v2 introducer
323                   "my-version": ver,
324                   "oldest-supported": oldest,
325                   }
326
327         index = (service_name, nodeid)
328         if self._current_announcements.get(index, None) == ann_d:
329             self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
330                      service=service_name, nodeid=nodeid_s,
331                      level=log.UNUSUAL, umid="B1MIdA")
332             self._debug_counts["duplicate_announcement"] += 1
333             return
334         if index in self._current_announcements:
335             self._debug_counts["update"] += 1
336         else:
337             self._debug_counts["new_announcement"] += 1
338
339         self._current_announcements[index] = ann_d
340         # note: we never forget an index, but we might update its value
341
342         for (service_name2,cb,args,kwargs) in self._local_subscribers:
343             if service_name2 == service_name:
344                 eventually(cb, nodeid, ann_d, *args, **kwargs)
345
346     def remote_set_encoding_parameters(self, parameters):
347         self.encoding_parameters = parameters
348
349     def connected_to_introducer(self):
350         return bool(self._publisher)
351
352 class IntroducerService_v1(service.MultiService, Referenceable):
353     implements(RIIntroducerPublisherAndSubscriberService_v1)
354     name = "introducer"
355     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1":
356                  { },
357                 "application-version": str(allmydata.__full_version__),
358                 }
359
360     def __init__(self, basedir="."):
361         service.MultiService.__init__(self)
362         self.introducer_url = None
363         # 'index' is (service_name, tubid)
364         self._announcements = {} # dict of index -> (announcement, timestamp)
365         self._subscribers = {} # dict of (rref->timestamp) dicts
366         self._debug_counts = {"inbound_message": 0,
367                               "inbound_duplicate": 0,
368                               "inbound_update": 0,
369                               "outbound_message": 0,
370                               "outbound_announcements": 0,
371                               "inbound_subscribe": 0}
372         self._debug_outstanding = 0
373
374     def _debug_retired(self, res):
375         self._debug_outstanding -= 1
376         return res
377
378     def log(self, *args, **kwargs):
379         if "facility" not in kwargs:
380             kwargs["facility"] = "tahoe.introducer"
381         return log.msg(*args, **kwargs)
382
383     def get_announcements(self):
384         return self._announcements
385     def get_subscribers(self):
386         return self._subscribers
387
388     def remote_get_version(self):
389         return self.VERSION
390
391     def remote_publish(self, announcement):
392         try:
393             self._publish(announcement)
394         except:
395             log.err(format="Introducer.remote_publish failed on %(ann)s",
396                     ann=announcement, level=log.UNUSUAL, umid="620rWA")
397             raise
398
399     def _publish(self, announcement):
400         self._debug_counts["inbound_message"] += 1
401         self.log("introducer: announcement published: %s" % (announcement,) )
402         (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
403         #print "PUB", service_name, nickname_utf8
404
405         nodeid = b32decode(SturdyRef(furl).tubID.upper())
406         index = (service_name, nodeid)
407
408         if index in self._announcements:
409             (old_announcement, timestamp) = self._announcements[index]
410             if old_announcement == announcement:
411                 self.log("but we already knew it, ignoring", level=log.NOISY)
412                 self._debug_counts["inbound_duplicate"] += 1
413                 return
414             else:
415                 self.log("old announcement being updated", level=log.NOISY)
416                 self._debug_counts["inbound_update"] += 1
417         self._announcements[index] = (announcement, time.time())
418
419         for s in self._subscribers.get(service_name, []):
420             self._debug_counts["outbound_message"] += 1
421             self._debug_counts["outbound_announcements"] += 1
422             self._debug_outstanding += 1
423             d = s.callRemote("announce", set([announcement]))
424             d.addBoth(self._debug_retired)
425             d.addErrback(rrefutil.trap_deadref)
426             d.addErrback(log.err,
427                          format="subscriber errored on announcement %(ann)s",
428                          ann=announcement, facility="tahoe.introducer",
429                          level=log.UNUSUAL, umid="jfGMXQ")
430
431     def remote_subscribe(self, subscriber, service_name):
432         self.log("introducer: subscription[%s] request at %s" % (service_name,
433                                                                  subscriber))
434         self._debug_counts["inbound_subscribe"] += 1
435         if service_name not in self._subscribers:
436             self._subscribers[service_name] = {}
437         subscribers = self._subscribers[service_name]
438         if subscriber in subscribers:
439             self.log("but they're already subscribed, ignoring",
440                      level=log.UNUSUAL)
441             return
442         subscribers[subscriber] = time.time()
443         def _remove():
444             self.log("introducer: unsubscribing[%s] %s" % (service_name,
445                                                            subscriber))
446             subscribers.pop(subscriber, None)
447         subscriber.notifyOnDisconnect(_remove)
448
449         announcements = set(
450             [ ann
451               for (sn2,nodeid),(ann,when) in self._announcements.items()
452               if sn2 == service_name] )
453
454         self._debug_counts["outbound_message"] += 1
455         self._debug_counts["outbound_announcements"] += len(announcements)
456         self._debug_outstanding += 1
457         d = subscriber.callRemote("announce", announcements)
458         d.addBoth(self._debug_retired)
459         d.addErrback(rrefutil.trap_deadref)
460         d.addErrback(log.err,
461                      format="subscriber errored during subscribe %(anns)s",
462                      anns=announcements, facility="tahoe.introducer",
463                      level=log.UNUSUAL, umid="1XChxA")