]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage_client.py
Teach StorageFarmBroker to fire a deferred when a connection threshold is reached...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage_client.py
1
2 """
3 I contain the client-side code which speaks to storage servers, in particular
4 the foolscap-based server implemented in src/allmydata/storage/*.py .
5 """
6
7 # roadmap:
8 #
9 # 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
10 # create it, change uploader/servermap to get rrefs from it. ServerFarm calls
11 # IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
12 # to clients. webapi status pages call broker.get_info_about_serverid.
13 #
14 # 2: move get_info methods to the descriptor, webapi status pages call
15 # broker.get_descriptor_for_serverid().get_info
16 #
17 # 3?later?: store descriptors in UploadResults/etc instead of serverids,
18 # webapi status pages call descriptor.get_info and don't use storage_broker
19 # or Client
20 #
21 # 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
22 # optional. This closes #467
23 #
24 # 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
25 # clients. Clients stop doing callRemote(), use NativeStorageClient methods
26 # instead (which might do something else, i.e. http or whatever). The
27 # introducer and tahoe.cfg only create NativeStorageClients for now.
28 #
29 # 6: implement other sorts of IStorageClient classes: S3, etc
30
31
32 import re, time
33 from zope.interface import implements
34 from foolscap.api import eventually
35 from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer
36 from allmydata.util import log, base32
37 from allmydata.util.assertutil import precondition
38 from allmydata.util.rrefutil import add_version_to_remote_reference
39 from allmydata.util.hashutil import sha1
40
41 # who is responsible for de-duplication?
42 #  both?
43 #  IC remembers the unpacked announcements it receives, to provide for late
44 #  subscribers and to remove duplicates
45
46 # if a client subscribes after startup, will they receive old announcements?
47 #  yes
48
49 # who will be responsible for signature checking?
50 #  make it be IntroducerClient, so they can push the filter outwards and
51 #  reduce inbound network traffic
52
53 # what should the interface between StorageFarmBroker and IntroducerClient
54 # look like?
55 #  don't pass signatures: only pass validated blessed-objects
56
57 class StorageFarmBroker:
58     implements(IStorageBroker)
59     """I live on the client, and know about storage servers. For each server
60     that is participating in a grid, I either maintain a connection to it or
61     remember enough information to establish a connection to it on demand.
62     I'm also responsible for subscribing to the IntroducerClient to find out
63     about new servers as they are announced by the Introducer.
64     """
65     def __init__(self, tub, permute_peers, connected_threshold, connected_d,
66                  preferred_peers=()):
67         self.tub = tub
68         assert permute_peers # False not implemented yet
69         self.permute_peers = permute_peers
70         self.connected_threshold = connected_threshold
71         self.connected_d = connected_d
72         self.preferred_peers = preferred_peers
73         # self.servers maps serverid -> IServer, and keeps track of all the
74         # storage servers that we've heard about. Each descriptor manages its
75         # own Reconnector, and will give us a RemoteReference when we ask
76         # them for it.
77         self.servers = {}
78         self.introducer_client = None
79
80     # these two are used in unit tests
81     def test_add_rref(self, serverid, rref, ann):
82         s = NativeStorageServer(serverid, ann.copy(), self)
83         s.rref = rref
84         s._is_connected = True
85         self.servers[serverid] = s
86
87     def test_add_server(self, serverid, s):
88         self.servers[serverid] = s
89
90     def use_introducer(self, introducer_client):
91         self.introducer_client = ic = introducer_client
92         ic.subscribe_to("storage", self._got_announcement)
93
94     def _got_announcement(self, key_s, ann):
95         if key_s is not None:
96             precondition(isinstance(key_s, str), key_s)
97             precondition(key_s.startswith("v0-"), key_s)
98         assert ann["service-name"] == "storage"
99         s = NativeStorageServer(key_s, ann, self)
100         serverid = s.get_serverid()
101         old = self.servers.get(serverid)
102         if old:
103             if old.get_announcement() == ann:
104                 return # duplicate
105             # replacement
106             del self.servers[serverid]
107             old.stop_connecting()
108             # now we forget about them and start using the new one
109         self.servers[serverid] = s
110         s.start_connecting(self.tub, self._trigger_connections)
111         # the descriptor will manage their own Reconnector, and each time we
112         # need servers, we'll ask them if they're connected or not.
113
114     def _trigger_connections(self):
115         # when one connection is established, reset the timers on all others,
116         # to trigger a reconnection attempt in one second. This is intended
117         # to accelerate server connections when we've been offline for a
118         # while. The goal is to avoid hanging out for a long time with
119         # connections to only a subset of the servers, which would increase
120         # the chances that we'll put shares in weird places (and not update
121         # existing shares of mutable files). See #374 for more details.
122         for dsc in self.servers.values():
123             dsc.try_to_connect()
124
125     def check_enough_connected(self):
126         if (self.connected_d is not None and
127             len(self.get_connected_servers()) >= self.connected_threshold):
128             d = self.connected_d
129             self.connected_d = None
130             d.callback(None)
131
132     def get_servers_for_psi(self, peer_selection_index):
133         # return a list of server objects (IServers)
134         assert self.permute_peers == True
135         connected_servers = self.get_connected_servers()
136         preferred_servers = frozenset(s for s in connected_servers if s.get_longname() in self.preferred_peers)
137         def _permuted(server):
138             seed = server.get_permutation_seed()
139             is_unpreferred = server not in preferred_servers
140             return (is_unpreferred, sha1(peer_selection_index + seed).digest())
141         return sorted(connected_servers, key=_permuted)
142
143     def get_all_serverids(self):
144         return frozenset(self.servers.keys())
145
146     def get_connected_servers(self):
147         return frozenset([s for s in self.servers.values() if s.is_connected()])
148
149     def get_known_servers(self):
150         return frozenset(self.servers.values())
151
152     def get_nickname_for_serverid(self, serverid):
153         if serverid in self.servers:
154             return self.servers[serverid].get_nickname()
155         return None
156
157     def get_stub_server(self, serverid):
158         if serverid in self.servers:
159             return self.servers[serverid]
160         return StubServer(serverid)
161
162 class StubServer:
163     implements(IDisplayableServer)
164     def __init__(self, serverid):
165         self.serverid = serverid # binary tubid
166     def get_serverid(self):
167         return self.serverid
168     def get_name(self):
169         return base32.b2a(self.serverid)[:8]
170     def get_longname(self):
171         return base32.b2a(self.serverid)
172     def get_nickname(self):
173         return "?"
174
175 class NativeStorageServer:
176     """I hold information about a storage server that we want to connect to.
177     If we are connected, I hold the RemoteReference, their host address, and
178     the their version information. I remember information about when we were
179     last connected too, even if we aren't currently connected.
180
181     @ivar announcement_time: when we first heard about this service
182     @ivar last_connect_time: when we last established a connection
183     @ivar last_loss_time: when we last lost a connection
184
185     @ivar version: the server's versiondict, from the most recent announcement
186     @ivar nickname: the server's self-reported nickname (unicode), same
187
188     @ivar rref: the RemoteReference, if connected, otherwise None
189     @ivar remote_host: the IAddress, if connected, otherwise None
190     """
191     implements(IServer)
192
193     VERSION_DEFAULTS = {
194         "http://allmydata.org/tahoe/protocols/storage/v1" :
195         { "maximum-immutable-share-size": 2**32 - 1,
196           "maximum-mutable-share-size": 2*1000*1000*1000, # maximum prior to v1.9.2
197           "tolerates-immutable-read-overrun": False,
198           "delete-mutable-shares-with-zero-length-writev": False,
199           "available-space": None,
200           },
201         "application-version": "unknown: no get_version()",
202         }
203
204     def __init__(self, key_s, ann, broker):
205         self.key_s = key_s
206         self.announcement = ann
207         self.broker = broker
208
209         assert "anonymous-storage-FURL" in ann, ann
210         furl = str(ann["anonymous-storage-FURL"])
211         m = re.match(r'pb://(\w+)@', furl)
212         assert m, furl
213         tubid_s = m.group(1).lower()
214         self._tubid = base32.a2b(tubid_s)
215         assert "permutation-seed-base32" in ann, ann
216         ps = base32.a2b(str(ann["permutation-seed-base32"]))
217         self._permutation_seed = ps
218
219         if key_s:
220             self._long_description = key_s
221             if key_s.startswith("v0-"):
222                 # remove v0- prefix from abbreviated name
223                 self._short_description = key_s[3:3+8]
224             else:
225                 self._short_description = key_s[:8]
226         else:
227             self._long_description = tubid_s
228             self._short_description = tubid_s[:6]
229
230         self.announcement_time = time.time()
231         self.last_connect_time = None
232         self.last_loss_time = None
233         self.remote_host = None
234         self.rref = None
235         self._is_connected = False
236         self._reconnector = None
237         self._trigger_cb = None
238
239     # Special methods used by copy.copy() and copy.deepcopy(). When those are
240     # used in allmydata.immutable.filenode to copy CheckResults during
241     # repair, we want it to treat the IServer instances as singletons, and
242     # not attempt to duplicate them..
243     def __copy__(self):
244         return self
245     def __deepcopy__(self, memodict):
246         return self
247
248     def __repr__(self):
249         return "<NativeStorageServer for %s>" % self.get_name()
250     def get_serverid(self):
251         return self._tubid # XXX replace with self.key_s
252     def get_permutation_seed(self):
253         return self._permutation_seed
254     def get_version(self):
255         if self.rref:
256             return self.rref.version
257         return None
258     def get_name(self): # keep methodname short
259         # TODO: decide who adds [] in the short description. It should
260         # probably be the output side, not here.
261         return self._short_description
262     def get_longname(self):
263         return self._long_description
264     def get_lease_seed(self):
265         return self._tubid
266     def get_foolscap_write_enabler_seed(self):
267         return self._tubid
268
269     def get_nickname(self):
270         return self.announcement["nickname"]
271     def get_announcement(self):
272         return self.announcement
273     def get_remote_host(self):
274         return self.remote_host
275     def is_connected(self):
276         return self._is_connected
277     def get_last_connect_time(self):
278         return self.last_connect_time
279     def get_last_loss_time(self):
280         return self.last_loss_time
281     def get_announcement_time(self):
282         return self.announcement_time
283
284     def get_available_space(self):
285         version = self.get_version()
286         if version is None:
287             return None
288         protocol_v1_version = version.get('http://allmydata.org/tahoe/protocols/storage/v1', {})
289         available_space = protocol_v1_version.get('available-space')
290         if available_space is None:
291             available_space = protocol_v1_version.get('maximum-immutable-share-size', None)
292         return available_space
293
294     def start_connecting(self, tub, trigger_cb):
295         furl = str(self.announcement["anonymous-storage-FURL"])
296         self._trigger_cb = trigger_cb
297         self._reconnector = tub.connectTo(furl, self._got_connection)
298
299     def _got_connection(self, rref):
300         lp = log.msg(format="got connection to %(name)s, getting versions",
301                      name=self.get_name(),
302                      facility="tahoe.storage_broker", umid="coUECQ")
303         if self._trigger_cb:
304             eventually(self._trigger_cb)
305         default = self.VERSION_DEFAULTS
306         d = add_version_to_remote_reference(rref, default)
307         d.addCallback(self._got_versioned_service, lp)
308         d.addCallback(lambda ign: self.broker.check_enough_connected())
309         d.addErrback(log.err, format="storageclient._got_connection",
310                      name=self.get_name(), umid="Sdq3pg")
311
312     def _got_versioned_service(self, rref, lp):
313         log.msg(format="%(name)s provided version info %(version)s",
314                 name=self.get_name(), version=rref.version,
315                 facility="tahoe.storage_broker", umid="SWmJYg",
316                 level=log.NOISY, parent=lp)
317
318         self.last_connect_time = time.time()
319         self.remote_host = rref.getPeer()
320         self.rref = rref
321         self._is_connected = True
322         rref.notifyOnDisconnect(self._lost)
323
324     def get_rref(self):
325         return self.rref
326
327     def _lost(self):
328         log.msg(format="lost connection to %(name)s", name=self.get_name(),
329                 facility="tahoe.storage_broker", umid="zbRllw")
330         self.last_loss_time = time.time()
331         # self.rref is now stale: all callRemote()s will get a
332         # DeadReferenceError. We leave the stale reference in place so that
333         # uploader/downloader code (which received this IServer through
334         # get_connected_servers() or get_servers_for_psi()) can continue to
335         # use s.get_rref().callRemote() and not worry about it being None.
336         self._is_connected = False
337         self.remote_host = None
338
339     def stop_connecting(self):
340         # used when this descriptor has been superceded by another
341         self._reconnector.stopConnecting()
342
343     def try_to_connect(self):
344         # used when the broker wants us to hurry up
345         self._reconnector.reset()
346
347 class UnknownServerTypeError(Exception):
348     pass