]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/introducer/client.py
#620: storage: allow mutable shares to be deleted, with a writev where new_length=0
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / introducer / client.py
1
2 import re, time, sha
3 from base64 import b32decode
4 from zope.interface import implements
5 from twisted.application import service
6 from foolscap import Referenceable
7 from allmydata.interfaces import InsufficientVersionError
8 from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
9      IIntroducerClient
10 from allmydata.util import log, idlib
11 from allmydata.util.rrefutil import get_versioned_remote_reference
12 from allmydata.introducer.common import make_index
13
14
15 class RemoteServiceConnector:
16     """I hold information about a peer service that we want to connect to. If
17     we are connected, I hold the RemoteReference, the peer's address, and the
18     peer's version information. I remember information about when we were
19     last connected to the peer too, even if we aren't currently connected.
20
21     @ivar announcement_time: when we first heard about this service
22     @ivar last_connect_time: when we last established a connection
23     @ivar last_loss_time: when we last lost a connection
24
25     @ivar version: the peer's version, from the most recent announcement
26     @ivar oldest_supported: the peer's oldest supported version, same
27     @ivar nickname: the peer's self-reported nickname, same
28
29     @ivar rref: the RemoteReference, if connected, otherwise None
30     @ivar remote_host: the IAddress, if connected, otherwise None
31     """
32
33     VERSION_DEFAULTS = {
34         "storage": { "http://allmydata.org/tahoe/protocols/storage/v1" :
35                      { "maximum-immutable-share-size": 2**32,
36                        "tolerates-immutable-read-overrun": False,
37                        "delete-mutable-shares-with-zero-length-writev": False,
38                        },
39                      "application-version": "unknown: no get_version()",
40                      },
41         "stub_client": { },
42         }
43
44     def __init__(self, announcement, tub, ic):
45         self._tub = tub
46         self._announcement = announcement
47         self._ic = ic
48         (furl, service_name, ri_name, nickname, ver, oldest) = announcement
49
50         self._furl = furl
51         m = re.match(r'pb://(\w+)@', furl)
52         assert m
53         self._nodeid = b32decode(m.group(1).upper())
54         self._nodeid_s = idlib.shortnodeid_b2a(self._nodeid)
55
56         self.service_name = service_name
57
58         self.log("attempting to connect to %s" % self._nodeid_s)
59         self.announcement_time = time.time()
60         self.last_loss_time = None
61         self.rref = None
62         self.remote_host = None
63         self.last_connect_time = None
64         self.version = ver
65         self.oldest_supported = oldest
66         self.nickname = nickname
67
68     def log(self, *args, **kwargs):
69         return self._ic.log(*args, **kwargs)
70
71     def startConnecting(self):
72         self._reconnector = self._tub.connectTo(self._furl, self._got_service)
73
74     def stopConnecting(self):
75         self._reconnector.stopConnecting()
76
77     def _got_service(self, rref):
78         self.log("got connection to %s, getting versions" % self._nodeid_s)
79
80         default = self.VERSION_DEFAULTS.get(self.service_name, {})
81         d = get_versioned_remote_reference(rref, default)
82         d.addCallback(self._got_versioned_service)
83
84     def _got_versioned_service(self, rref):
85         self.log("connected to %s, version %s" % (self._nodeid_s, rref.version))
86
87         self.last_connect_time = time.time()
88         self.remote_host = rref.rref.tracker.broker.transport.getPeer()
89
90         self.rref = rref
91
92         self._ic.add_connection(self._nodeid, self.service_name, rref)
93
94         rref.notifyOnDisconnect(self._lost, rref)
95
96     def _lost(self, rref):
97         self.log("lost connection to %s" % self._nodeid_s)
98         self.last_loss_time = time.time()
99         self.rref = None
100         self.remote_host = None
101         self._ic.remove_connection(self._nodeid, self.service_name, rref)
102
103
104     def reset(self):
105         self._reconnector.reset()
106
107
108 class IntroducerClient(service.Service, Referenceable):
109     implements(RIIntroducerSubscriberClient, IIntroducerClient)
110
111     def __init__(self, tub, introducer_furl,
112                  nickname, my_version, oldest_supported):
113         self._tub = tub
114         self.introducer_furl = introducer_furl
115
116         self._nickname = nickname.encode("utf-8")
117         self._my_version = my_version
118         self._oldest_supported = oldest_supported
119
120         self._published_announcements = set()
121
122         self._publisher = None
123         self._connected = False
124
125         self._subscribed_service_names = set()
126         self._subscriptions = set() # requests we've actually sent
127         self._received_announcements = set()
128         # TODO: this set will grow without bound, until the node is restarted
129
130         # we only accept one announcement per (peerid+service_name) pair.
131         # This insures that an upgraded host replace their previous
132         # announcement. It also means that each peer must have their own Tub
133         # (no sharing), which is slightly weird but consistent with the rest
134         # of the Tahoe codebase.
135         self._connectors = {} # k: (peerid+svcname), v: RemoteServiceConnector
136         # self._connections is a set of (peerid, service_name, rref) tuples
137         self._connections = set()
138
139         self.counter = 0 # incremented each time we change state, for tests
140         self.encoding_parameters = None
141
142     def startService(self):
143         service.Service.startService(self)
144         self._introducer_error = None
145         rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
146         self._introducer_reconnector = rc
147         def connect_failed(failure):
148             self.log("Initial Introducer connection failed: perhaps it's down",
149                      level=log.WEIRD, failure=failure, umid="c5MqUQ")
150         d = self._tub.getReference(self.introducer_furl)
151         d.addErrback(connect_failed)
152
153     def _got_introducer(self, publisher):
154         self.log("connected to introducer, getting versions")
155         default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
156                     { },
157                     "application-version": "unknown: no get_version()",
158                     }
159         d = get_versioned_remote_reference(publisher, default)
160         d.addCallback(self._got_versioned_introducer)
161         d.addErrback(self._got_error)
162
163     def _got_error(self, f):
164         # TODO: for the introducer, perhaps this should halt the application
165         self._introducer_error = f # polled by tests
166
167     def _got_versioned_introducer(self, publisher):
168         self.log("got introducer version: %s" % (publisher.version,))
169         # we require a V1 introducer
170         needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
171         if needed not in publisher.version:
172             raise InsufficientVersionError(needed, publisher.version)
173         self._connected = True
174         self._publisher = publisher
175         publisher.notifyOnDisconnect(self._disconnected)
176         self._maybe_publish()
177         self._maybe_subscribe()
178
179     def _disconnected(self):
180         self.log("bummer, we've lost our connection to the introducer")
181         self._connected = False
182         self._publisher = None
183         self._subscriptions.clear()
184
185     def stopService(self):
186         service.Service.stopService(self)
187         self._introducer_reconnector.stopConnecting()
188         for rsc in self._connectors.itervalues():
189             rsc.stopConnecting()
190
191     def log(self, *args, **kwargs):
192         if "facility" not in kwargs:
193             kwargs["facility"] = "tahoe.introducer"
194         return log.msg(*args, **kwargs)
195
196
197     def publish(self, furl, service_name, remoteinterface_name):
198         ann = (furl, service_name, remoteinterface_name,
199                self._nickname, self._my_version, self._oldest_supported)
200         self._published_announcements.add(ann)
201         self._maybe_publish()
202
203     def subscribe_to(self, service_name):
204         self._subscribed_service_names.add(service_name)
205         self._maybe_subscribe()
206
207     def _maybe_subscribe(self):
208         if not self._publisher:
209             self.log("want to subscribe, but no introducer yet",
210                      level=log.NOISY)
211             return
212         for service_name in self._subscribed_service_names:
213             if service_name not in self._subscriptions:
214                 # there is a race here, but the subscription desk ignores
215                 # duplicate requests.
216                 self._subscriptions.add(service_name)
217                 d = self._publisher.callRemote("subscribe", self, service_name)
218                 d.addErrback(log.err, facility="tahoe.introducer",
219                              level=log.WEIRD, umid="2uMScQ")
220
221     def _maybe_publish(self):
222         if not self._publisher:
223             self.log("want to publish, but no introducer yet", level=log.NOISY)
224             return
225         # this re-publishes everything. The Introducer ignores duplicates
226         for ann in self._published_announcements:
227             d = self._publisher.callRemote("publish", ann)
228             d.addErrback(log.err, facility="tahoe.introducer",
229                          level=log.WEIRD, umid="xs9pVQ")
230
231
232
233     def remote_announce(self, announcements):
234         for ann in announcements:
235             self.log("received %d announcements" % len(announcements))
236             (furl, service_name, ri_name, nickname, ver, oldest) = ann
237             if service_name not in self._subscribed_service_names:
238                 self.log("announcement for a service we don't care about [%s]"
239                          % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
240                 continue
241             if ann in self._received_announcements:
242                 self.log("ignoring old announcement: %s" % (ann,),
243                          level=log.NOISY)
244                 continue
245             self.log("new announcement[%s]: %s" % (service_name, ann))
246             self._received_announcements.add(ann)
247             self._new_announcement(ann)
248
249     def _new_announcement(self, announcement):
250         # this will only be called for new announcements
251         index = make_index(announcement)
252         if index in self._connectors:
253             self.log("replacing earlier announcement", level=log.NOISY)
254             self._connectors[index].stopConnecting()
255         rsc = RemoteServiceConnector(announcement, self._tub, self)
256         self._connectors[index] = rsc
257         rsc.startConnecting()
258
259     def add_connection(self, nodeid, service_name, rref):
260         self._connections.add( (nodeid, service_name, rref) )
261         self.counter += 1
262         # when one connection is established, reset the timers on all others,
263         # to trigger a reconnection attempt in one second. This is intended
264         # to accelerate server connections when we've been offline for a
265         # while. The goal is to avoid hanging out for a long time with
266         # connections to only a subset of the servers, which would increase
267         # the chances that we'll put shares in weird places (and not update
268         # existing shares of mutable files). See #374 for more details.
269         for rsc in self._connectors.values():
270             rsc.reset()
271
272     def remove_connection(self, nodeid, service_name, rref):
273         self._connections.discard( (nodeid, service_name, rref) )
274         self.counter += 1
275
276
277     def get_all_connections(self):
278         return frozenset(self._connections)
279
280     def get_all_connectors(self):
281         return self._connectors.copy()
282
283     def get_all_peerids(self):
284         return frozenset([peerid
285                           for (peerid, service_name, rref)
286                           in self._connections])
287
288     def get_nickname_for_peerid(self, peerid):
289         for k in self._connectors:
290             (peerid0, svcname0) = k
291             if peerid0 == peerid:
292                 rsc = self._connectors[k]
293                 return rsc.nickname
294         return None
295
296     def get_all_connections_for(self, service_name):
297         return frozenset([c
298                           for c in self._connections
299                           if c[1] == service_name])
300
301     def get_peers(self, service_name):
302         """Return a set of (peerid, versioned-rref) tuples."""
303         return frozenset([(peerid, r) for (peerid, servname, r) in self._connections if servname == service_name])
304
305     def get_permuted_peers(self, service_name, key):
306         """Return an ordered list of (peerid, versioned-rref) tuples."""
307
308         servers = self.get_peers(service_name)
309
310         return sorted(servers, key=lambda x: sha.new(key+x[0]).digest())
311
312     def remote_set_encoding_parameters(self, parameters):
313         self.encoding_parameters = parameters
314
315     def connected_to_introducer(self):
316         return self._connected
317
318     def debug_disconnect_from_peerid(self, victim_nodeid):
319         # for unit tests: locate and sever all connections to the given
320         # peerid.
321         for (nodeid, service_name, rref) in self._connections:
322             if nodeid == victim_nodeid:
323                 rref.tracker.broker.transport.loseConnection()