]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/introducer/old.py
3c2fcb7122510e69bc0cc8d0f4c8ab1e2d89bbdf
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / introducer / old.py
1
2 import time
3 from base64 import b32decode
4 from zope.interface import implements, Interface
5 from twisted.application import service
6 import allmydata
7 from allmydata.interfaces import InsufficientVersionError
8 from allmydata.util import log, idlib, rrefutil
9 from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \
10     RemoteInterface, Referenceable, eventually, SturdyRef
11 from allmydata.introducer.common import SubscriberDescriptor, \
12      AnnouncementDescriptor
13 FURL = StringConstraint(1000)
14
15 # We keep a copy of the old introducer (both client and server) here to
16 # support compatibility tests. The old client is supposed to handle the new
17 # server, and new client is supposed to handle the old server.
18
19
20 # Announcements are (FURL, service_name, remoteinterface_name,
21 #                    nickname, my_version, oldest_supported)
22 #  the (FURL, service_name, remoteinterface_name) refer to the service being
23 #  announced. The (nickname, my_version, oldest_supported) refer to the
24 #  client as a whole. The my_version/oldest_supported strings can be parsed
25 #  by an allmydata.util.version.Version instance, and then compared. The
26 #  first goal is to make sure that nodes are not confused by speaking to an
27 #  incompatible peer. The second goal is to enable the development of
28 #  backwards-compatibility code.
29
30 Announcement = TupleOf(FURL, str, str,
31                        str, str, str)
32
33 class RIIntroducerSubscriberClient_v1(RemoteInterface):
34     __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com"
35
36     def announce(announcements=SetOf(Announcement)):
37         """I accept announcements from the publisher."""
38         return None
39
40     def set_encoding_parameters(parameters=(int, int, int)):
41         """Advise the client of the recommended k-of-n encoding parameters
42         for this grid. 'parameters' is a tuple of (k, desired, n), where 'n'
43         is the total number of shares that will be created for any given
44         file, while 'k' is the number of shares that must be retrieved to
45         recover that file, and 'desired' is the minimum number of shares that
46         must be placed before the uploader will consider its job a success.
47         n/k is the expansion ratio, while k determines the robustness.
48
49         Introducers should specify 'n' according to the expected size of the
50         grid (there is no point to producing more shares than there are
51         peers), and k according to the desired reliability-vs-overhead goals.
52
53         Note that setting k=1 is equivalent to simple replication.
54         """
55         return None
56
57 # When Foolscap can handle multiple interfaces (Foolscap#17), the
58 # full-powered introducer will implement both RIIntroducerPublisher and
59 # RIIntroducerSubscriberService. Until then, we define
60 # RIIntroducerPublisherAndSubscriberService as a combination of the two, and
61 # make everybody use that.
62
63 class RIIntroducerPublisher_v1(RemoteInterface):
64     """To publish a service to the world, connect to me and give me your
65     announcement message. I will deliver a copy to all connected subscribers."""
66     __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com"
67
68     def publish(announcement=Announcement):
69         # canary?
70         return None
71
72 class RIIntroducerSubscriberService_v1(RemoteInterface):
73     __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com"
74
75     def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
76         """Give me a subscriber reference, and I will call its new_peers()
77         method will any announcements that match the desired service name. I
78         will ignore duplicate subscriptions.
79         """
80         return None
81
82 class RIIntroducerPublisherAndSubscriberService_v1(RemoteInterface):
83     __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com"
84     def get_version():
85         return DictOf(str, Any())
86     def publish(announcement=Announcement):
87         return None
88     def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
89         return None
90
91 class IIntroducerClient(Interface):
92     """I provide service introduction facilities for a node. I help nodes
93     publish their services to the rest of the world, and I help them learn
94     about services available on other nodes."""
95
96     def publish(furl, service_name, remoteinterface_name):
97         """Once you call this, I will tell the world that the Referenceable
98         available at FURL is available to provide a service named
99         SERVICE_NAME. The precise definition of the service being provided is
100         identified by the Foolscap 'remote interface name' in the last
101         parameter: this is supposed to be a globally-unique string that
102         identifies the RemoteInterface that is implemented."""
103
104     def subscribe_to(service_name, callback, *args, **kwargs):
105         """Call this if you will eventually want to use services with the
106         given SERVICE_NAME. This will prompt me to subscribe to announcements
107         of those services. Your callback will be invoked with at least two
108         arguments: a serverid (binary string), and an announcement
109         dictionary, followed by any additional callback args/kwargs you give
110         me. I will run your callback for both new announcements and for
111         announcements that have changed, but you must be prepared to tolerate
112         duplicates.
113
114         The announcement dictionary that I give you will have the following
115         keys:
116
117          version: 0
118          service-name: str('storage')
119
120          FURL: str(furl)
121          remoteinterface-name: str(ri_name)
122          nickname: unicode
123          app-versions: {}
124          my-version: str
125          oldest-supported: str
126
127         Note that app-version will be an empty dictionary until #466 is done
128         and both the introducer and the remote client have been upgraded. For
129         current (native) server types, the serverid will always be equal to
130         the binary form of the FURL's tubid.
131         """
132
133     def connected_to_introducer():
134         """Returns a boolean, True if we are currently connected to the
135         introducer, False if not."""
136
137
138 class IntroducerClient_v1(service.Service, Referenceable):
139     implements(RIIntroducerSubscriberClient_v1, IIntroducerClient)
140
141     def __init__(self, tub, introducer_furl,
142                  nickname, my_version, oldest_supported):
143         self._tub = tub
144         self.introducer_furl = introducer_furl
145
146         assert type(nickname) is unicode
147         self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
148         self._my_version = my_version
149         self._oldest_supported = oldest_supported
150
151         self._published_announcements = set()
152
153         self._publisher = None
154
155         self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
156         self._subscribed_service_names = set()
157         self._subscriptions = set() # requests we've actually sent
158
159         # _current_announcements remembers one announcement per
160         # (servicename,serverid) pair. Anything that arrives with the same
161         # pair will displace the previous one. This stores unpacked
162         # announcement dictionaries, which can be compared for equality to
163         # distinguish re-announcement from updates. It also provides memory
164         # for clients who subscribe after startup.
165         self._current_announcements = {}
166
167         self.encoding_parameters = None
168
169         # hooks for unit tests
170         self._debug_counts = {
171             "inbound_message": 0,
172             "inbound_announcement": 0,
173             "wrong_service": 0,
174             "duplicate_announcement": 0,
175             "update": 0,
176             "new_announcement": 0,
177             "outbound_message": 0,
178             }
179         self._debug_outstanding = 0
180
181     def _debug_retired(self, res):
182         self._debug_outstanding -= 1
183         return res
184
185     def startService(self):
186         service.Service.startService(self)
187         self._introducer_error = None
188         rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
189         self._introducer_reconnector = rc
190         def connect_failed(failure):
191             self.log("Initial Introducer connection failed: perhaps it's down",
192                      level=log.WEIRD, failure=failure, umid="c5MqUQ")
193         d = self._tub.getReference(self.introducer_furl)
194         d.addErrback(connect_failed)
195
196     def _got_introducer(self, publisher):
197         self.log("connected to introducer, getting versions")
198         default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
199                     { },
200                     "application-version": "unknown: no get_version()",
201                     }
202         d = rrefutil.add_version_to_remote_reference(publisher, default)
203         d.addCallback(self._got_versioned_introducer)
204         d.addErrback(self._got_error)
205
206     def _got_error(self, f):
207         # TODO: for the introducer, perhaps this should halt the application
208         self._introducer_error = f # polled by tests
209
210     def _got_versioned_introducer(self, publisher):
211         self.log("got introducer version: %s" % (publisher.version,))
212         # we require a V1 introducer
213         needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
214         if needed not in publisher.version:
215             raise InsufficientVersionError(needed, publisher.version)
216         self._publisher = publisher
217         publisher.notifyOnDisconnect(self._disconnected)
218         self._maybe_publish()
219         self._maybe_subscribe()
220
221     def _disconnected(self):
222         self.log("bummer, we've lost our connection to the introducer")
223         self._publisher = None
224         self._subscriptions.clear()
225
226     def log(self, *args, **kwargs):
227         if "facility" not in kwargs:
228             kwargs["facility"] = "tahoe.introducer"
229         return log.msg(*args, **kwargs)
230
231
232     def publish(self, furl, service_name, remoteinterface_name):
233         assert type(self._nickname_utf8) is str # we always send UTF-8
234         ann = (furl, service_name, remoteinterface_name,
235                self._nickname_utf8, self._my_version, self._oldest_supported)
236         self._published_announcements.add(ann)
237         self._maybe_publish()
238
239     def subscribe_to(self, service_name, cb, *args, **kwargs):
240         self._local_subscribers.append( (service_name,cb,args,kwargs) )
241         self._subscribed_service_names.add(service_name)
242         self._maybe_subscribe()
243         for (servicename,nodeid),ann_d in self._current_announcements.items():
244             if servicename == service_name:
245                 eventually(cb, nodeid, ann_d)
246
247     def _maybe_subscribe(self):
248         if not self._publisher:
249             self.log("want to subscribe, but no introducer yet",
250                      level=log.NOISY)
251             return
252         for service_name in self._subscribed_service_names:
253             if service_name not in self._subscriptions:
254                 # there is a race here, but the subscription desk ignores
255                 # duplicate requests.
256                 self._subscriptions.add(service_name)
257                 self._debug_outstanding += 1
258                 d = self._publisher.callRemote("subscribe", self, service_name)
259                 d.addBoth(self._debug_retired)
260                 d.addErrback(rrefutil.trap_deadref)
261                 d.addErrback(log.err, format="server errored during subscribe",
262                              facility="tahoe.introducer",
263                              level=log.WEIRD, umid="2uMScQ")
264
265     def _maybe_publish(self):
266         if not self._publisher:
267             self.log("want to publish, but no introducer yet", level=log.NOISY)
268             return
269         # this re-publishes everything. The Introducer ignores duplicates
270         for ann in self._published_announcements:
271             self._debug_counts["outbound_message"] += 1
272             self._debug_outstanding += 1
273             d = self._publisher.callRemote("publish", ann)
274             d.addBoth(self._debug_retired)
275             d.addErrback(rrefutil.trap_deadref)
276             d.addErrback(log.err,
277                          format="server errored during publish %(ann)s",
278                          ann=ann, facility="tahoe.introducer",
279                          level=log.WEIRD, umid="xs9pVQ")
280
281
282
283     def remote_announce(self, announcements):
284         self.log("received %d announcements" % len(announcements))
285         self._debug_counts["inbound_message"] += 1
286         for ann in announcements:
287             try:
288                 self._process_announcement(ann)
289             except:
290                 log.err(format="unable to process announcement %(ann)s",
291                         ann=ann)
292                 # Don't let a corrupt announcement prevent us from processing
293                 # the remaining ones. Don't return an error to the server,
294                 # since they'd just ignore it anyways.
295                 pass
296
297     def _process_announcement(self, ann):
298         self._debug_counts["inbound_announcement"] += 1
299         (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
300         if service_name not in self._subscribed_service_names:
301             self.log("announcement for a service we don't care about [%s]"
302                      % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
303             self._debug_counts["wrong_service"] += 1
304             return
305         self.log("announcement for [%s]: %s" % (service_name, ann),
306                  umid="BoKEag")
307         assert type(furl) is str
308         assert type(service_name) is str
309         assert type(ri_name) is str
310         assert type(nickname_utf8) is str
311         nickname = nickname_utf8.decode("utf-8")
312         assert type(nickname) is unicode
313         assert type(ver) is str
314         assert type(oldest) is str
315
316         nodeid = b32decode(SturdyRef(furl).tubID.upper())
317         nodeid_s = idlib.shortnodeid_b2a(nodeid)
318
319         ann_d = { "version": 0,
320                   "service-name": service_name,
321
322                   "FURL": furl,
323                   "nickname": nickname,
324                   "app-versions": {}, # need #466 and v2 introducer
325                   "my-version": ver,
326                   "oldest-supported": oldest,
327                   }
328
329         index = (service_name, nodeid)
330         if self._current_announcements.get(index, None) == ann_d:
331             self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
332                      service=service_name, nodeid=nodeid_s,
333                      level=log.UNUSUAL, umid="B1MIdA")
334             self._debug_counts["duplicate_announcement"] += 1
335             return
336         if index in self._current_announcements:
337             self._debug_counts["update"] += 1
338         else:
339             self._debug_counts["new_announcement"] += 1
340
341         self._current_announcements[index] = ann_d
342         # note: we never forget an index, but we might update its value
343
344         for (service_name2,cb,args,kwargs) in self._local_subscribers:
345             if service_name2 == service_name:
346                 eventually(cb, nodeid, ann_d, *args, **kwargs)
347
348     def remote_set_encoding_parameters(self, parameters):
349         self.encoding_parameters = parameters
350
351     def connected_to_introducer(self):
352         return bool(self._publisher)
353
354 class IntroducerService_v1(service.MultiService, Referenceable):
355     implements(RIIntroducerPublisherAndSubscriberService_v1)
356     name = "introducer"
357     VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1":
358                  { },
359                 "application-version": str(allmydata.__full_version__),
360                 }
361
362     def __init__(self, basedir="."):
363         service.MultiService.__init__(self)
364         self.introducer_url = None
365         # 'index' is (service_name, tubid)
366         self._announcements = {} # dict of index -> (announcement, timestamp)
367         self._subscribers = {} # [service_name]->[rref]->timestamp
368         self._debug_counts = {"inbound_message": 0,
369                               "inbound_duplicate": 0,
370                               "inbound_update": 0,
371                               "outbound_message": 0,
372                               "outbound_announcements": 0,
373                               "inbound_subscribe": 0}
374         self._debug_outstanding = 0
375
376     def _debug_retired(self, res):
377         self._debug_outstanding -= 1
378         return res
379
380     def log(self, *args, **kwargs):
381         if "facility" not in kwargs:
382             kwargs["facility"] = "tahoe.introducer"
383         return log.msg(*args, **kwargs)
384
385     def get_announcements(self, include_stub_clients=True):
386         announcements = []
387         for index, (ann_t, when) in self._announcements.items():
388             (furl, service_name, ri_name, nickname, ver, oldest) = ann_t
389             if service_name == "stub_client" and not include_stub_clients:
390                 continue
391             ann_d = {"nickname": nickname.decode("utf-8", "replace"),
392                      "my-version": ver,
393                      "service-name": service_name,
394                      "anonymous-storage-FURL": furl,
395                      }
396             # the V2 introducer uses (service_name, key_s, tubid_s) as an
397             # index, so match that format for AnnouncementDescriptor
398             new_index = (index[0], None, idlib.nodeid_b2a(index[1]))
399             ad = AnnouncementDescriptor(when, new_index, None, ann_d)
400             announcements.append(ad)
401         return announcements
402
403     def get_subscribers(self):
404         s = []
405         for service_name, subscribers in self._subscribers.items():
406             for rref, when in subscribers.items():
407                 tubid = rref.getRemoteTubID() or "?"
408                 remote_address = rrefutil.stringify_remote_address(rref)
409                 nickname, version, app_versions = u"?", u"?", {}
410                 sd = SubscriberDescriptor(service_name, when,
411                                           nickname, version, app_versions,
412                                           remote_address, tubid)
413                 s.append(sd)
414         return s
415
416     def remote_get_version(self):
417         return self.VERSION
418
419     def remote_publish(self, announcement):
420         try:
421             self._publish(announcement)
422         except:
423             log.err(format="Introducer.remote_publish failed on %(ann)s",
424                     ann=announcement, level=log.UNUSUAL, umid="620rWA")
425             raise
426
427     def _publish(self, announcement):
428         self._debug_counts["inbound_message"] += 1
429         self.log("introducer: announcement published: %s" % (announcement,) )
430         (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
431         #print "PUB", service_name, nickname_utf8
432
433         nodeid = b32decode(SturdyRef(furl).tubID.upper())
434         index = (service_name, nodeid)
435
436         if index in self._announcements:
437             (old_announcement, timestamp) = self._announcements[index]
438             if old_announcement == announcement:
439                 self.log("but we already knew it, ignoring", level=log.NOISY)
440                 self._debug_counts["inbound_duplicate"] += 1
441                 return
442             else:
443                 self.log("old announcement being updated", level=log.NOISY)
444                 self._debug_counts["inbound_update"] += 1
445         self._announcements[index] = (announcement, time.time())
446
447         for s in self._subscribers.get(service_name, []):
448             self._debug_counts["outbound_message"] += 1
449             self._debug_counts["outbound_announcements"] += 1
450             self._debug_outstanding += 1
451             d = s.callRemote("announce", set([announcement]))
452             d.addBoth(self._debug_retired)
453             d.addErrback(rrefutil.trap_deadref)
454             d.addErrback(log.err,
455                          format="subscriber errored on announcement %(ann)s",
456                          ann=announcement, facility="tahoe.introducer",
457                          level=log.UNUSUAL, umid="jfGMXQ")
458
459     def remote_subscribe(self, subscriber, service_name):
460         self.log("introducer: subscription[%s] request at %s" % (service_name,
461                                                                  subscriber))
462         self._debug_counts["inbound_subscribe"] += 1
463         if service_name not in self._subscribers:
464             self._subscribers[service_name] = {}
465         subscribers = self._subscribers[service_name]
466         if subscriber in subscribers:
467             self.log("but they're already subscribed, ignoring",
468                      level=log.UNUSUAL)
469             return
470         subscribers[subscriber] = time.time()
471         def _remove():
472             self.log("introducer: unsubscribing[%s] %s" % (service_name,
473                                                            subscriber))
474             subscribers.pop(subscriber, None)
475         subscriber.notifyOnDisconnect(_remove)
476
477         announcements = set(
478             [ ann
479               for (sn2,nodeid),(ann,when) in self._announcements.items()
480               if sn2 == service_name] )
481
482         self._debug_counts["outbound_message"] += 1
483         self._debug_counts["outbound_announcements"] += len(announcements)
484         self._debug_outstanding += 1
485         d = subscriber.callRemote("announce", announcements)
486         d.addBoth(self._debug_retired)
487         d.addErrback(rrefutil.trap_deadref)
488         d.addErrback(log.err,
489                      format="subscriber errored during subscribe %(anns)s",
490                      anns=announcements, facility="tahoe.introducer",
491                      level=log.UNUSUAL, umid="1XChxA")