]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage_client.py
wui: improved columns in welcome page server list
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage_client.py
1
2 """
3 I contain the client-side code which speaks to storage servers, in particular
4 the foolscap-based server implemented in src/allmydata/storage/*.py .
5 """
6
7 # roadmap:
8 #
9 # 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
10 # create it, change uploader/servermap to get rrefs from it. ServerFarm calls
11 # IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
12 # to clients. webapi status pages call broker.get_info_about_serverid.
13 #
14 # 2: move get_info methods to the descriptor, webapi status pages call
15 # broker.get_descriptor_for_serverid().get_info
16 #
17 # 3?later?: store descriptors in UploadResults/etc instead of serverids,
18 # webapi status pages call descriptor.get_info and don't use storage_broker
19 # or Client
20 #
21 # 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
22 # optional. This closes #467
23 #
24 # 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
25 # clients. Clients stop doing callRemote(), use NativeStorageClient methods
26 # instead (which might do something else, i.e. http or whatever). The
27 # introducer and tahoe.cfg only create NativeStorageClients for now.
28 #
29 # 6: implement other sorts of IStorageClient classes: S3, etc
30
31
32 import re, time
33 from zope.interface import implements
34 from foolscap.api import eventually
35 from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer
36 from allmydata.util import log, base32
37 from allmydata.util.assertutil import precondition
38 from allmydata.util.rrefutil import add_version_to_remote_reference
39 from allmydata.util.hashutil import sha1
40
41 # who is responsible for de-duplication?
42 #  both?
43 #  IC remembers the unpacked announcements it receives, to provide for late
44 #  subscribers and to remove duplicates
45
46 # if a client subscribes after startup, will they receive old announcements?
47 #  yes
48
49 # who will be responsible for signature checking?
50 #  make it be IntroducerClient, so they can push the filter outwards and
51 #  reduce inbound network traffic
52
53 # what should the interface between StorageFarmBroker and IntroducerClient
54 # look like?
55 #  don't pass signatures: only pass validated blessed-objects
56
57 class StorageFarmBroker:
58     implements(IStorageBroker)
59     """I live on the client, and know about storage servers. For each server
60     that is participating in a grid, I either maintain a connection to it or
61     remember enough information to establish a connection to it on demand.
62     I'm also responsible for subscribing to the IntroducerClient to find out
63     about new servers as they are announced by the Introducer.
64     """
65     def __init__(self, tub, permute_peers, preferred_peers=()):
66         self.tub = tub
67         assert permute_peers # False not implemented yet
68         self.permute_peers = permute_peers
69         self.preferred_peers = preferred_peers
70         # self.servers maps serverid -> IServer, and keeps track of all the
71         # storage servers that we've heard about. Each descriptor manages its
72         # own Reconnector, and will give us a RemoteReference when we ask
73         # them for it.
74         self.servers = {}
75         self.introducer_client = None
76
77     # these two are used in unit tests
78     def test_add_rref(self, serverid, rref, ann):
79         s = NativeStorageServer(serverid, ann.copy())
80         s.rref = rref
81         s._is_connected = True
82         self.servers[serverid] = s
83
84     def test_add_server(self, serverid, s):
85         self.servers[serverid] = s
86
87     def use_introducer(self, introducer_client):
88         self.introducer_client = ic = introducer_client
89         ic.subscribe_to("storage", self._got_announcement)
90
91     def _got_announcement(self, key_s, ann):
92         if key_s is not None:
93             precondition(isinstance(key_s, str), key_s)
94             precondition(key_s.startswith("v0-"), key_s)
95         assert ann["service-name"] == "storage"
96         s = NativeStorageServer(key_s, ann)
97         serverid = s.get_serverid()
98         old = self.servers.get(serverid)
99         if old:
100             if old.get_announcement() == ann:
101                 return # duplicate
102             # replacement
103             del self.servers[serverid]
104             old.stop_connecting()
105             # now we forget about them and start using the new one
106         self.servers[serverid] = s
107         s.start_connecting(self.tub, self._trigger_connections)
108         # the descriptor will manage their own Reconnector, and each time we
109         # need servers, we'll ask them if they're connected or not.
110
111     def _trigger_connections(self):
112         # when one connection is established, reset the timers on all others,
113         # to trigger a reconnection attempt in one second. This is intended
114         # to accelerate server connections when we've been offline for a
115         # while. The goal is to avoid hanging out for a long time with
116         # connections to only a subset of the servers, which would increase
117         # the chances that we'll put shares in weird places (and not update
118         # existing shares of mutable files). See #374 for more details.
119         for dsc in self.servers.values():
120             dsc.try_to_connect()
121
122     def get_servers_for_psi(self, peer_selection_index):
123         # return a list of server objects (IServers)
124         assert self.permute_peers == True
125         connected_servers = self.get_connected_servers()
126         preferred_servers = frozenset(s for s in connected_servers if s.get_longname() in self.preferred_peers)
127         def _permuted(server):
128             seed = server.get_permutation_seed()
129             is_unpreferred = server not in preferred_servers
130             return (is_unpreferred, sha1(peer_selection_index + seed).digest())
131         return sorted(connected_servers, key=_permuted)
132
133     def get_all_serverids(self):
134         return frozenset(self.servers.keys())
135
136     def get_connected_servers(self):
137         return frozenset([s for s in self.servers.values() if s.is_connected()])
138
139     def get_known_servers(self):
140         return frozenset(self.servers.values())
141
142     def get_nickname_for_serverid(self, serverid):
143         if serverid in self.servers:
144             return self.servers[serverid].get_nickname()
145         return None
146
147     def get_stub_server(self, serverid):
148         if serverid in self.servers:
149             return self.servers[serverid]
150         return StubServer(serverid)
151
152 class StubServer:
153     implements(IDisplayableServer)
154     def __init__(self, serverid):
155         self.serverid = serverid # binary tubid
156     def get_serverid(self):
157         return self.serverid
158     def get_name(self):
159         return base32.b2a(self.serverid)[:8]
160     def get_longname(self):
161         return base32.b2a(self.serverid)
162     def get_nickname(self):
163         return "?"
164
165 class NativeStorageServer:
166     """I hold information about a storage server that we want to connect to.
167     If we are connected, I hold the RemoteReference, their host address, and
168     the their version information. I remember information about when we were
169     last connected too, even if we aren't currently connected.
170
171     @ivar last_connect_time: when we last established a connection
172     @ivar last_loss_time: when we last lost a connection
173
174     @ivar version: the server's versiondict, from the most recent announcement
175     @ivar nickname: the server's self-reported nickname (unicode), same
176
177     @ivar rref: the RemoteReference, if connected, otherwise None
178     @ivar remote_host: the IAddress, if connected, otherwise None
179     """
180     implements(IServer)
181
182     VERSION_DEFAULTS = {
183         "http://allmydata.org/tahoe/protocols/storage/v1" :
184         { "maximum-immutable-share-size": 2**32 - 1,
185           "maximum-mutable-share-size": 2*1000*1000*1000, # maximum prior to v1.9.2
186           "tolerates-immutable-read-overrun": False,
187           "delete-mutable-shares-with-zero-length-writev": False,
188           "available-space": None,
189           },
190         "application-version": "unknown: no get_version()",
191         }
192
193     def __init__(self, key_s, ann):
194         self.key_s = key_s
195         self.announcement = ann
196
197         assert "anonymous-storage-FURL" in ann, ann
198         furl = str(ann["anonymous-storage-FURL"])
199         m = re.match(r'pb://(\w+)@', furl)
200         assert m, furl
201         tubid_s = m.group(1).lower()
202         self._tubid = base32.a2b(tubid_s)
203         assert "permutation-seed-base32" in ann, ann
204         ps = base32.a2b(str(ann["permutation-seed-base32"]))
205         self._permutation_seed = ps
206
207         if key_s:
208             self._long_description = key_s
209             if key_s.startswith("v0-"):
210                 # remove v0- prefix from abbreviated name
211                 self._short_description = key_s[3:3+8]
212             else:
213                 self._short_description = key_s[:8]
214         else:
215             self._long_description = tubid_s
216             self._short_description = tubid_s[:6]
217
218         self.last_connect_time = None
219         self.last_loss_time = None
220         self.remote_host = None
221         self.rref = None
222         self._is_connected = False
223         self._reconnector = None
224         self._trigger_cb = None
225
226     # Special methods used by copy.copy() and copy.deepcopy(). When those are
227     # used in allmydata.immutable.filenode to copy CheckResults during
228     # repair, we want it to treat the IServer instances as singletons, and
229     # not attempt to duplicate them..
230     def __copy__(self):
231         return self
232     def __deepcopy__(self, memodict):
233         return self
234
235     def __repr__(self):
236         return "<NativeStorageServer for %s>" % self.get_name()
237     def get_serverid(self):
238         return self._tubid # XXX replace with self.key_s
239     def get_permutation_seed(self):
240         return self._permutation_seed
241     def get_version(self):
242         if self.rref:
243             return self.rref.version
244         return None
245     def get_name(self): # keep methodname short
246         # TODO: decide who adds [] in the short description. It should
247         # probably be the output side, not here.
248         return self._short_description
249     def get_longname(self):
250         return self._long_description
251     def get_lease_seed(self):
252         return self._tubid
253     def get_foolscap_write_enabler_seed(self):
254         return self._tubid
255
256     def get_nickname(self):
257         return self.announcement["nickname"]
258     def get_announcement(self):
259         return self.announcement
260     def get_remote_host(self):
261         return self.remote_host
262     def is_connected(self):
263         return self._is_connected
264     def get_last_connect_time(self):
265         return self.last_connect_time
266     def get_last_loss_time(self):
267         return self.last_loss_time
268     def get_last_received_data_time(self):
269         if self.rref is None:
270             return None
271         else:
272             return self.rref.getDataLastReceivedAt()
273
274     def get_available_space(self):
275         version = self.get_version()
276         if version is None:
277             return None
278         protocol_v1_version = version.get('http://allmydata.org/tahoe/protocols/storage/v1', {})
279         available_space = protocol_v1_version.get('available-space')
280         if available_space is None:
281             available_space = protocol_v1_version.get('maximum-immutable-share-size', None)
282         return available_space
283
284     def start_connecting(self, tub, trigger_cb):
285         furl = str(self.announcement["anonymous-storage-FURL"])
286         self._trigger_cb = trigger_cb
287         self._reconnector = tub.connectTo(furl, self._got_connection)
288
289     def _got_connection(self, rref):
290         lp = log.msg(format="got connection to %(name)s, getting versions",
291                      name=self.get_name(),
292                      facility="tahoe.storage_broker", umid="coUECQ")
293         if self._trigger_cb:
294             eventually(self._trigger_cb)
295         default = self.VERSION_DEFAULTS
296         d = add_version_to_remote_reference(rref, default)
297         d.addCallback(self._got_versioned_service, lp)
298         d.addErrback(log.err, format="storageclient._got_connection",
299                      name=self.get_name(), umid="Sdq3pg")
300
301     def _got_versioned_service(self, rref, lp):
302         log.msg(format="%(name)s provided version info %(version)s",
303                 name=self.get_name(), version=rref.version,
304                 facility="tahoe.storage_broker", umid="SWmJYg",
305                 level=log.NOISY, parent=lp)
306
307         self.last_connect_time = time.time()
308         self.remote_host = rref.getPeer()
309         self.rref = rref
310         self._is_connected = True
311         rref.notifyOnDisconnect(self._lost)
312
313     def get_rref(self):
314         return self.rref
315
316     def _lost(self):
317         log.msg(format="lost connection to %(name)s", name=self.get_name(),
318                 facility="tahoe.storage_broker", umid="zbRllw")
319         self.last_loss_time = time.time()
320         # self.rref is now stale: all callRemote()s will get a
321         # DeadReferenceError. We leave the stale reference in place so that
322         # uploader/downloader code (which received this IServer through
323         # get_connected_servers() or get_servers_for_psi()) can continue to
324         # use s.get_rref().callRemote() and not worry about it being None.
325         self._is_connected = False
326         self.remote_host = None
327
328     def stop_connecting(self):
329         # used when this descriptor has been superceded by another
330         self._reconnector.stopConnecting()
331
332     def try_to_connect(self):
333         # used when the broker wants us to hurry up
334         self._reconnector.reset()
335
336 class UnknownServerTypeError(Exception):
337     pass