]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage_client.py
e532db1a68d8658d60d06cd347c25f90ba6b2fa0
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage_client.py
1
2 """
3 I contain the client-side code which speaks to storage servers, in particular
4 the foolscap-based server implemented in src/allmydata/storage/*.py .
5 """
6
7 # roadmap:
8 #
9 # 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
10 # create it, change uploader/servermap to get rrefs from it. ServerFarm calls
11 # IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
12 # to clients. webapi status pages call broker.get_info_about_serverid.
13 #
14 # 2: move get_info methods to the descriptor, webapi status pages call
15 # broker.get_descriptor_for_serverid().get_info
16 #
17 # 3?later?: store descriptors in UploadResults/etc instead of serverids,
18 # webapi status pages call descriptor.get_info and don't use storage_broker
19 # or Client
20 #
21 # 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
22 # optional. This closes #467
23 #
24 # 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
25 # clients. Clients stop doing callRemote(), use NativeStorageClient methods
26 # instead (which might do something else, i.e. http or whatever). The
27 # introducer and tahoe.cfg only create NativeStorageClients for now.
28 #
29 # 6: implement other sorts of IStorageClient classes: S3, etc
30
31
32 import re, time
33 from zope.interface import implements
34 from foolscap.api import eventually
35 from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer
36 from allmydata.util import log, base32
37 from allmydata.util.assertutil import precondition
38 from allmydata.util.rrefutil import add_version_to_remote_reference
39 from allmydata.util.hashutil import sha1
40
41 # who is responsible for de-duplication?
42 #  both?
43 #  IC remembers the unpacked announcements it receives, to provide for late
44 #  subscribers and to remove duplicates
45
46 # if a client subscribes after startup, will they receive old announcements?
47 #  yes
48
49 # who will be responsible for signature checking?
50 #  make it be IntroducerClient, so they can push the filter outwards and
51 #  reduce inbound network traffic
52
53 # what should the interface between StorageFarmBroker and IntroducerClient
54 # look like?
55 #  don't pass signatures: only pass validated blessed-objects
56
57 class StorageFarmBroker:
58     implements(IStorageBroker)
59     """I live on the client, and know about storage servers. For each server
60     that is participating in a grid, I either maintain a connection to it or
61     remember enough information to establish a connection to it on demand.
62     I'm also responsible for subscribing to the IntroducerClient to find out
63     about new servers as they are announced by the Introducer.
64     """
65     def __init__(self, tub, permute_peers):
66         self.tub = tub
67         assert permute_peers # False not implemented yet
68         self.permute_peers = permute_peers
69         # self.servers maps serverid -> IServer, and keeps track of all the
70         # storage servers that we've heard about. Each descriptor manages its
71         # own Reconnector, and will give us a RemoteReference when we ask
72         # them for it.
73         self.servers = {}
74         self.introducer_client = None
75
76     # these two are used in unit tests
77     def test_add_rref(self, serverid, rref, ann):
78         s = NativeStorageServer(serverid, ann.copy())
79         s.rref = rref
80         s._is_connected = True
81         self.servers[serverid] = s
82
83     def test_add_server(self, serverid, s):
84         self.servers[serverid] = s
85
86     def use_introducer(self, introducer_client):
87         self.introducer_client = ic = introducer_client
88         ic.subscribe_to("storage", self._got_announcement)
89
90     def _got_announcement(self, key_s, ann):
91         if key_s is not None:
92             precondition(isinstance(key_s, str), key_s)
93             precondition(key_s.startswith("v0-"), key_s)
94         assert ann["service-name"] == "storage"
95         s = NativeStorageServer(key_s, ann)
96         serverid = s.get_serverid()
97         old = self.servers.get(serverid)
98         if old:
99             if old.get_announcement() == ann:
100                 return # duplicate
101             # replacement
102             del self.servers[serverid]
103             old.stop_connecting()
104             # now we forget about them and start using the new one
105         self.servers[serverid] = s
106         s.start_connecting(self.tub, self._trigger_connections)
107         # the descriptor will manage their own Reconnector, and each time we
108         # need servers, we'll ask them if they're connected or not.
109
110     def _trigger_connections(self):
111         # when one connection is established, reset the timers on all others,
112         # to trigger a reconnection attempt in one second. This is intended
113         # to accelerate server connections when we've been offline for a
114         # while. The goal is to avoid hanging out for a long time with
115         # connections to only a subset of the servers, which would increase
116         # the chances that we'll put shares in weird places (and not update
117         # existing shares of mutable files). See #374 for more details.
118         for dsc in self.servers.values():
119             dsc.try_to_connect()
120
121     def get_servers_for_psi(self, peer_selection_index):
122         # return a list of server objects (IServers)
123         assert self.permute_peers == True
124         def _permuted(server):
125             seed = server.get_permutation_seed()
126             return sha1(peer_selection_index + seed).digest()
127         return sorted(self.get_connected_servers(), key=_permuted)
128
129     def get_all_serverids(self):
130         return frozenset(self.servers.keys())
131
132     def get_connected_servers(self):
133         return frozenset([s for s in self.servers.values() if s.is_connected()])
134
135     def get_known_servers(self):
136         return frozenset(self.servers.values())
137
138     def get_nickname_for_serverid(self, serverid):
139         if serverid in self.servers:
140             return self.servers[serverid].get_nickname()
141         return None
142
143     def get_stub_server(self, serverid):
144         if serverid in self.servers:
145             return self.servers[serverid]
146         return StubServer(serverid)
147
148 class StubServer:
149     implements(IDisplayableServer)
150     def __init__(self, serverid):
151         self.serverid = serverid # binary tubid
152     def get_serverid(self):
153         return self.serverid
154     def get_name(self):
155         return base32.b2a(self.serverid)[:8]
156     def get_longname(self):
157         return base32.b2a(self.serverid)
158     def get_nickname(self):
159         return "?"
160
161 class NativeStorageServer:
162     """I hold information about a storage server that we want to connect to.
163     If we are connected, I hold the RemoteReference, their host address, and
164     the their version information. I remember information about when we were
165     last connected too, even if we aren't currently connected.
166
167     @ivar announcement_time: when we first heard about this service
168     @ivar last_connect_time: when we last established a connection
169     @ivar last_loss_time: when we last lost a connection
170
171     @ivar version: the server's versiondict, from the most recent announcement
172     @ivar nickname: the server's self-reported nickname (unicode), same
173
174     @ivar rref: the RemoteReference, if connected, otherwise None
175     @ivar remote_host: the IAddress, if connected, otherwise None
176     """
177     implements(IServer)
178
179     VERSION_DEFAULTS = {
180         "http://allmydata.org/tahoe/protocols/storage/v1" :
181         { "maximum-immutable-share-size": 2**32 - 1,
182           "maximum-mutable-share-size": 2*1000*1000*1000, # maximum prior to v1.9.2
183           "tolerates-immutable-read-overrun": False,
184           "delete-mutable-shares-with-zero-length-writev": False,
185           "available-space": None,
186           },
187         "application-version": "unknown: no get_version()",
188         }
189
190     def __init__(self, key_s, ann):
191         self.key_s = key_s
192         self.announcement = ann
193
194         assert "anonymous-storage-FURL" in ann, ann
195         furl = str(ann["anonymous-storage-FURL"])
196         m = re.match(r'pb://(\w+)@', furl)
197         assert m, furl
198         tubid_s = m.group(1).lower()
199         self._tubid = base32.a2b(tubid_s)
200         assert "permutation-seed-base32" in ann, ann
201         ps = base32.a2b(str(ann["permutation-seed-base32"]))
202         self._permutation_seed = ps
203
204         if key_s:
205             self._long_description = key_s
206             if key_s.startswith("v0-"):
207                 # remove v0- prefix from abbreviated name
208                 self._short_description = key_s[3:3+8]
209             else:
210                 self._short_description = key_s[:8]
211         else:
212             self._long_description = tubid_s
213             self._short_description = tubid_s[:6]
214
215         self.announcement_time = time.time()
216         self.last_connect_time = None
217         self.last_loss_time = None
218         self.remote_host = None
219         self.rref = None
220         self._is_connected = False
221         self._reconnector = None
222         self._trigger_cb = None
223
224     # Special methods used by copy.copy() and copy.deepcopy(). When those are
225     # used in allmydata.immutable.filenode to copy CheckResults during
226     # repair, we want it to treat the IServer instances as singletons, and
227     # not attempt to duplicate them..
228     def __copy__(self):
229         return self
230     def __deepcopy__(self, memodict):
231         return self
232
233     def __repr__(self):
234         return "<NativeStorageServer for %s>" % self.get_name()
235     def get_serverid(self):
236         return self._tubid # XXX replace with self.key_s
237     def get_permutation_seed(self):
238         return self._permutation_seed
239     def get_version(self):
240         if self.rref:
241             return self.rref.version
242         return None
243     def get_name(self): # keep methodname short
244         # TODO: decide who adds [] in the short description. It should
245         # probably be the output side, not here.
246         return self._short_description
247     def get_longname(self):
248         return self._long_description
249     def get_lease_seed(self):
250         return self._tubid
251     def get_foolscap_write_enabler_seed(self):
252         return self._tubid
253
254     def get_nickname(self):
255         return self.announcement["nickname"]
256     def get_announcement(self):
257         return self.announcement
258     def get_remote_host(self):
259         return self.remote_host
260     def is_connected(self):
261         return self._is_connected
262     def get_last_connect_time(self):
263         return self.last_connect_time
264     def get_last_loss_time(self):
265         return self.last_loss_time
266     def get_announcement_time(self):
267         return self.announcement_time
268
269     def get_available_space(self):
270         version = self.get_version()
271         if version is None:
272             return None
273         protocol_v1_version = version.get('http://allmydata.org/tahoe/protocols/storage/v1', {})
274         available_space = protocol_v1_version.get('available-space')
275         if available_space is None:
276             available_space = protocol_v1_version.get('maximum-immutable-share-size', None)
277         return available_space
278
279     def start_connecting(self, tub, trigger_cb):
280         furl = str(self.announcement["anonymous-storage-FURL"])
281         self._trigger_cb = trigger_cb
282         self._reconnector = tub.connectTo(furl, self._got_connection)
283
284     def _got_connection(self, rref):
285         lp = log.msg(format="got connection to %(name)s, getting versions",
286                      name=self.get_name(),
287                      facility="tahoe.storage_broker", umid="coUECQ")
288         if self._trigger_cb:
289             eventually(self._trigger_cb)
290         default = self.VERSION_DEFAULTS
291         d = add_version_to_remote_reference(rref, default)
292         d.addCallback(self._got_versioned_service, lp)
293         d.addErrback(log.err, format="storageclient._got_connection",
294                      name=self.get_name(), umid="Sdq3pg")
295
296     def _got_versioned_service(self, rref, lp):
297         log.msg(format="%(name)s provided version info %(version)s",
298                 name=self.get_name(), version=rref.version,
299                 facility="tahoe.storage_broker", umid="SWmJYg",
300                 level=log.NOISY, parent=lp)
301
302         self.last_connect_time = time.time()
303         self.remote_host = rref.getPeer()
304         self.rref = rref
305         self._is_connected = True
306         rref.notifyOnDisconnect(self._lost)
307
308     def get_rref(self):
309         return self.rref
310
311     def _lost(self):
312         log.msg(format="lost connection to %(name)s", name=self.get_name(),
313                 facility="tahoe.storage_broker", umid="zbRllw")
314         self.last_loss_time = time.time()
315         # self.rref is now stale: all callRemote()s will get a
316         # DeadReferenceError. We leave the stale reference in place so that
317         # uploader/downloader code (which received this IServer through
318         # get_connected_servers() or get_servers_for_psi()) can continue to
319         # use s.get_rref().callRemote() and not worry about it being None.
320         self._is_connected = False
321         self.remote_host = None
322
323     def stop_connecting(self):
324         # used when this descriptor has been superceded by another
325         self._reconnector.stopConnecting()
326
327     def try_to_connect(self):
328         # used when the broker wants us to hurry up
329         self._reconnector.reset()
330
331 class UnknownServerTypeError(Exception):
332     pass