]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage_client.py
68823f01f834969100db12cee2235673df721f69
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage_client.py
1
2 """
3 I contain the client-side code which speaks to storage servers, in particular
4 the foolscap-based server implemented in src/allmydata/storage/*.py .
5 """
6
7 # roadmap:
8 #
9 # 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
10 # create it, change uploader/servermap to get rrefs from it. ServerFarm calls
11 # IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
12 # to clients. webapi status pages call broker.get_info_about_serverid.
13 #
14 # 2: move get_info methods to the descriptor, webapi status pages call
15 # broker.get_descriptor_for_serverid().get_info
16 #
17 # 3?later?: store descriptors in UploadResults/etc instead of serverids,
18 # webapi status pages call descriptor.get_info and don't use storage_broker
19 # or Client
20 #
21 # 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
22 # optional. This closes #467
23 #
24 # 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
25 # clients. Clients stop doing callRemote(), use NativeStorageClient methods
26 # instead (which might do something else, i.e. http or whatever). The
27 # introducer and tahoe.cfg only create NativeStorageClients for now.
28 #
29 # 6: implement other sorts of IStorageClient classes: S3, etc
30
31
32 import re, time
33 from zope.interface import implements
34 from foolscap.api import eventually
35 from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer
36 from allmydata.util import log, base32
37 from allmydata.util.assertutil import precondition
38 from allmydata.util.rrefutil import add_version_to_remote_reference
39 from allmydata.util.hashutil import sha1
40
41 # who is responsible for de-duplication?
42 #  both?
43 #  IC remembers the unpacked announcements it receives, to provide for late
44 #  subscribers and to remove duplicates
45
46 # if a client subscribes after startup, will they receive old announcements?
47 #  yes
48
49 # who will be responsible for signature checking?
50 #  make it be IntroducerClient, so they can push the filter outwards and
51 #  reduce inbound network traffic
52
53 # what should the interface between StorageFarmBroker and IntroducerClient
54 # look like?
55 #  don't pass signatures: only pass validated blessed-objects
56
57 class StorageFarmBroker:
58     implements(IStorageBroker)
59     """I live on the client, and know about storage servers. For each server
60     that is participating in a grid, I either maintain a connection to it or
61     remember enough information to establish a connection to it on demand.
62     I'm also responsible for subscribing to the IntroducerClient to find out
63     about new servers as they are announced by the Introducer.
64     """
65     def __init__(self, tub, permute_peers):
66         self.tub = tub
67         assert permute_peers # False not implemented yet
68         self.permute_peers = permute_peers
69         # self.servers maps serverid -> IServer, and keeps track of all the
70         # storage servers that we've heard about. Each descriptor manages its
71         # own Reconnector, and will give us a RemoteReference when we ask
72         # them for it.
73         self.servers = {}
74         self.introducer_client = None
75
76     # these two are used in unit tests
77     def test_add_rref(self, serverid, rref, ann):
78         s = NativeStorageServer(serverid, ann.copy())
79         s.rref = rref
80         self.servers[serverid] = s
81
82     def test_add_server(self, serverid, s):
83         self.servers[serverid] = s
84
85     def use_introducer(self, introducer_client):
86         self.introducer_client = ic = introducer_client
87         ic.subscribe_to("storage", self._got_announcement)
88
89     def _got_announcement(self, key_s, ann):
90         if key_s is not None:
91             precondition(isinstance(key_s, str), key_s)
92             precondition(key_s.startswith("v0-"), key_s)
93         assert ann["service-name"] == "storage"
94         s = NativeStorageServer(key_s, ann)
95         serverid = s.get_serverid()
96         old = self.servers.get(serverid)
97         if old:
98             if old.get_announcement() == ann:
99                 return # duplicate
100             # replacement
101             del self.servers[serverid]
102             old.stop_connecting()
103             # now we forget about them and start using the new one
104         self.servers[serverid] = s
105         s.start_connecting(self.tub, self._trigger_connections)
106         # the descriptor will manage their own Reconnector, and each time we
107         # need servers, we'll ask them if they're connected or not.
108
109     def _trigger_connections(self):
110         # when one connection is established, reset the timers on all others,
111         # to trigger a reconnection attempt in one second. This is intended
112         # to accelerate server connections when we've been offline for a
113         # while. The goal is to avoid hanging out for a long time with
114         # connections to only a subset of the servers, which would increase
115         # the chances that we'll put shares in weird places (and not update
116         # existing shares of mutable files). See #374 for more details.
117         for dsc in self.servers.values():
118             dsc.try_to_connect()
119
120     def get_servers_for_psi(self, peer_selection_index):
121         # return a list of server objects (IServers)
122         assert self.permute_peers == True
123         def _permuted(server):
124             seed = server.get_permutation_seed()
125             return sha1(peer_selection_index + seed).digest()
126         return sorted(self.get_connected_servers(), key=_permuted)
127
128     def get_all_serverids(self):
129         return frozenset(self.servers.keys())
130
131     def get_connected_servers(self):
132         return frozenset([s for s in self.servers.values() if s.get_rref()])
133
134     def get_known_servers(self):
135         return frozenset(self.servers.values())
136
137     def get_nickname_for_serverid(self, serverid):
138         if serverid in self.servers:
139             return self.servers[serverid].get_nickname()
140         return None
141
142     def get_stub_server(self, serverid):
143         if serverid in self.servers:
144             return self.servers[serverid]
145         return StubServer(serverid)
146
147 class StubServer:
148     implements(IDisplayableServer)
149     def __init__(self, serverid):
150         self.serverid = serverid # binary tubid
151     def get_serverid(self):
152         return self.serverid
153     def get_name(self):
154         return base32.b2a(self.serverid)[:8]
155     def get_longname(self):
156         return base32.b2a(self.serverid)
157     def get_nickname(self):
158         return "?"
159
160 class NativeStorageServer:
161     """I hold information about a storage server that we want to connect to.
162     If we are connected, I hold the RemoteReference, their host address, and
163     the their version information. I remember information about when we were
164     last connected too, even if we aren't currently connected.
165
166     @ivar announcement_time: when we first heard about this service
167     @ivar last_connect_time: when we last established a connection
168     @ivar last_loss_time: when we last lost a connection
169
170     @ivar version: the server's versiondict, from the most recent announcement
171     @ivar nickname: the server's self-reported nickname (unicode), same
172
173     @ivar rref: the RemoteReference, if connected, otherwise None
174     @ivar remote_host: the IAddress, if connected, otherwise None
175     """
176     implements(IServer)
177
178     VERSION_DEFAULTS = {
179         "http://allmydata.org/tahoe/protocols/storage/v1" :
180         { "maximum-immutable-share-size": 2**32,
181           "tolerates-immutable-read-overrun": False,
182           "delete-mutable-shares-with-zero-length-writev": False,
183           },
184         "application-version": "unknown: no get_version()",
185         }
186
187     def __init__(self, key_s, ann, min_shares=1):
188         self.key_s = key_s
189         self.announcement = ann
190         self.min_shares = min_shares
191
192         assert "anonymous-storage-FURL" in ann, ann
193         furl = str(ann["anonymous-storage-FURL"])
194         m = re.match(r'pb://(\w+)@', furl)
195         assert m, furl
196         tubid_s = m.group(1).lower()
197         self._tubid = base32.a2b(tubid_s)
198         assert "permutation-seed-base32" in ann, ann
199         ps = base32.a2b(str(ann["permutation-seed-base32"]))
200         self._permutation_seed = ps
201
202         if key_s:
203             self._long_description = key_s
204             if key_s.startswith("v0-"):
205                 # remove v0- prefix from abbreviated name
206                 self._short_description = key_s[3:3+8]
207             else:
208                 self._short_description = key_s[:8]
209         else:
210             self._long_description = tubid_s
211             self._short_description = tubid_s[:6]
212
213         self.announcement_time = time.time()
214         self.last_connect_time = None
215         self.last_loss_time = None
216         self.remote_host = None
217         self.rref = None
218         self._reconnector = None
219         self._trigger_cb = None
220
221     # Special methods used by copy.copy() and copy.deepcopy(). When those are
222     # used in allmydata.immutable.filenode to copy CheckResults during
223     # repair, we want it to treat the IServer instances as singletons, and
224     # not attempt to duplicate them..
225     def __copy__(self):
226         return self
227     def __deepcopy__(self, memodict):
228         return self
229
230     def __repr__(self):
231         return "<NativeStorageServer for %s>" % self.get_name()
232     def get_serverid(self):
233         return self._tubid # XXX replace with self.key_s
234     def get_permutation_seed(self):
235         return self._permutation_seed
236     def get_version(self):
237         if self.rref:
238             return self.rref.version
239         return None
240     def get_name(self): # keep methodname short
241         # TODO: decide who adds [] in the short description. It should
242         # probably be the output side, not here.
243         return self._short_description
244     def get_longname(self):
245         return self._long_description
246     def get_lease_seed(self):
247         return self._tubid
248     def get_foolscap_write_enabler_seed(self):
249         return self._tubid
250
251     def get_nickname(self):
252         return self.announcement["nickname"].decode("utf-8")
253     def get_announcement(self):
254         return self.announcement
255     def get_remote_host(self):
256         return self.remote_host
257     def get_last_connect_time(self):
258         return self.last_connect_time
259     def get_last_loss_time(self):
260         return self.last_loss_time
261     def get_announcement_time(self):
262         return self.announcement_time
263
264     def start_connecting(self, tub, trigger_cb):
265         furl = str(self.announcement["anonymous-storage-FURL"])
266         self._trigger_cb = trigger_cb
267         self._reconnector = tub.connectTo(furl, self._got_connection)
268
269     def _got_connection(self, rref):
270         lp = log.msg(format="got connection to %(name)s, getting versions",
271                      name=self.get_name(),
272                      facility="tahoe.storage_broker", umid="coUECQ")
273         if self._trigger_cb:
274             eventually(self._trigger_cb)
275         default = self.VERSION_DEFAULTS
276         d = add_version_to_remote_reference(rref, default)
277         d.addCallback(self._got_versioned_service, lp)
278         d.addErrback(log.err, format="storageclient._got_connection",
279                      name=self.get_name(), umid="Sdq3pg")
280
281     def _got_versioned_service(self, rref, lp):
282         log.msg(format="%(name)s provided version info %(version)s",
283                 name=self.get_name(), version=rref.version,
284                 facility="tahoe.storage_broker", umid="SWmJYg",
285                 level=log.NOISY, parent=lp)
286
287         self.last_connect_time = time.time()
288         self.remote_host = rref.getPeer()
289         self.rref = rref
290         rref.notifyOnDisconnect(self._lost)
291
292     def get_rref(self):
293         return self.rref
294
295     def _lost(self):
296         log.msg(format="lost connection to %(name)s", name=self.get_name(),
297                 facility="tahoe.storage_broker", umid="zbRllw")
298         self.last_loss_time = time.time()
299         self.rref = None
300         self.remote_host = None
301
302     def stop_connecting(self):
303         # used when this descriptor has been superceded by another
304         self._reconnector.stopConnecting()
305
306     def try_to_connect(self):
307         # used when the broker wants us to hurry up
308         self._reconnector.reset()
309
310 class UnknownServerTypeError(Exception):
311     pass