]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage_client.py
add remaining get_* methods to storage_client.Server, NoNetworkServer, and
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage_client.py
1
2 """
3 I contain the client-side code which speaks to storage servers, in particular
4 the foolscap-based server implemented in src/allmydata/storage/*.py .
5 """
6
7 # roadmap:
8 #
9 # 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
10 # create it, change uploader/servermap to get rrefs from it. ServerFarm calls
11 # IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
12 # to clients. webapi status pages call broker.get_info_about_serverid.
13 #
14 # 2: move get_info methods to the descriptor, webapi status pages call
15 # broker.get_descriptor_for_serverid().get_info
16 #
17 # 3?later?: store descriptors in UploadResults/etc instead of serverids,
18 # webapi status pages call descriptor.get_info and don't use storage_broker
19 # or Client
20 #
21 # 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
22 # optional. This closes #467
23 #
24 # 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
25 # clients. Clients stop doing callRemote(), use NativeStorageClient methods
26 # instead (which might do something else, i.e. http or whatever). The
27 # introducer and tahoe.cfg only create NativeStorageClients for now.
28 #
29 # 6: implement other sorts of IStorageClient classes: S3, etc
30
31
32 import time
33 from zope.interface import implements, Interface
34 from foolscap.api import eventually
35 from allmydata.interfaces import IStorageBroker
36 from allmydata.util import idlib, log
37 from allmydata.util.assertutil import precondition
38 from allmydata.util.rrefutil import add_version_to_remote_reference
39 from allmydata.util.hashutil import sha1
40
41 # who is responsible for de-duplication?
42 #  both?
43 #  IC remembers the unpacked announcements it receives, to provide for late
44 #  subscribers and to remove duplicates
45
46 # if a client subscribes after startup, will they receive old announcements?
47 #  yes
48
49 # who will be responsible for signature checking?
50 #  make it be IntroducerClient, so they can push the filter outwards and
51 #  reduce inbound network traffic
52
53 # what should the interface between StorageFarmBroker and IntroducerClient
54 # look like?
55 #  don't pass signatures: only pass validated blessed-objects
56
57 class StorageFarmBroker:
58     implements(IStorageBroker)
59     """I live on the client, and know about storage servers. For each server
60     that is participating in a grid, I either maintain a connection to it or
61     remember enough information to establish a connection to it on demand.
62     I'm also responsible for subscribing to the IntroducerClient to find out
63     about new servers as they are announced by the Introducer.
64     """
65     def __init__(self, tub, permute_peers):
66         self.tub = tub
67         assert permute_peers # False not implemented yet
68         self.permute_peers = permute_peers
69         # self.servers maps serverid -> IServer, and keeps track of all the
70         # storage servers that we've heard about. Each descriptor manages its
71         # own Reconnector, and will give us a RemoteReference when we ask
72         # them for it.
73         self.servers = {}
74         self.introducer_client = None
75
76     # these two are used in unit tests
77     def test_add_rref(self, serverid, rref):
78         s = NativeStorageServer(serverid, {})
79         s.rref = rref
80         self.servers[serverid] = s
81
82     def test_add_server(self, serverid, s):
83         self.servers[serverid] = s
84
85     def use_introducer(self, introducer_client):
86         self.introducer_client = ic = introducer_client
87         ic.subscribe_to("storage", self._got_announcement)
88
89     def _got_announcement(self, serverid, ann_d):
90         precondition(isinstance(serverid, str), serverid)
91         precondition(len(serverid) == 20, serverid)
92         assert ann_d["service-name"] == "storage"
93         old = self.servers.get(serverid)
94         if old:
95             if old.get_announcement() == ann_d:
96                 return # duplicate
97             # replacement
98             del self.servers[serverid]
99             old.stop_connecting()
100             # now we forget about them and start using the new one
101         dsc = NativeStorageServer(serverid, ann_d)
102         self.servers[serverid] = dsc
103         dsc.start_connecting(self.tub, self._trigger_connections)
104         # the descriptor will manage their own Reconnector, and each time we
105         # need servers, we'll ask them if they're connected or not.
106
107     def _trigger_connections(self):
108         # when one connection is established, reset the timers on all others,
109         # to trigger a reconnection attempt in one second. This is intended
110         # to accelerate server connections when we've been offline for a
111         # while. The goal is to avoid hanging out for a long time with
112         # connections to only a subset of the servers, which would increase
113         # the chances that we'll put shares in weird places (and not update
114         # existing shares of mutable files). See #374 for more details.
115         for dsc in self.servers.values():
116             dsc.try_to_connect()
117
118     def get_servers_for_psi(self, peer_selection_index):
119         # return a list of server objects (IServers)
120         assert self.permute_peers == True
121         def _permuted(server):
122             seed = server.get_permutation_seed()
123             return sha1(peer_selection_index + seed).digest()
124         return sorted(self.get_connected_servers(), key=_permuted)
125
126     def get_all_serverids(self):
127         serverids = set()
128         serverids.update(self.servers.keys())
129         return frozenset(serverids)
130
131     def get_connected_servers(self):
132         return frozenset([s for s in self.get_known_servers()
133                           if s.get_rref()])
134
135     def get_known_servers(self):
136         return sorted(self.servers.values(), key=lambda s: s.get_serverid())
137
138     def get_nickname_for_serverid(self, serverid):
139         if serverid in self.servers:
140             return self.servers[serverid].get_nickname()
141         return None
142
143
144 class IServer(Interface):
145     """I live in the client, and represent a single server."""
146     def start_connecting(tub, trigger_cb):
147         pass
148     def get_nickname():
149         pass
150     def get_rref():
151         pass
152
153 class NativeStorageServer:
154     """I hold information about a storage server that we want to connect to.
155     If we are connected, I hold the RemoteReference, their host address, and
156     the their version information. I remember information about when we were
157     last connected too, even if we aren't currently connected.
158
159     @ivar announcement_time: when we first heard about this service
160     @ivar last_connect_time: when we last established a connection
161     @ivar last_loss_time: when we last lost a connection
162
163     @ivar version: the server's versiondict, from the most recent announcement
164     @ivar nickname: the server's self-reported nickname (unicode), same
165
166     @ivar rref: the RemoteReference, if connected, otherwise None
167     @ivar remote_host: the IAddress, if connected, otherwise None
168     """
169     implements(IServer)
170
171     VERSION_DEFAULTS = {
172         "http://allmydata.org/tahoe/protocols/storage/v1" :
173         { "maximum-immutable-share-size": 2**32,
174           "tolerates-immutable-read-overrun": False,
175           "delete-mutable-shares-with-zero-length-writev": False,
176           },
177         "application-version": "unknown: no get_version()",
178         }
179
180     def __init__(self, serverid, ann_d, min_shares=1):
181         self.serverid = serverid
182         self._tubid = serverid
183         self.announcement = ann_d
184         self.min_shares = min_shares
185
186         self.serverid_s = idlib.shortnodeid_b2a(self.serverid)
187         self.announcement_time = time.time()
188         self.last_connect_time = None
189         self.last_loss_time = None
190         self.remote_host = None
191         self.rref = None
192         self._reconnector = None
193         self._trigger_cb = None
194
195     def __repr__(self):
196         return "<NativeStorageServer for %s>" % self.name()
197     def get_serverid(self):
198         return self._tubid
199     def get_permutation_seed(self):
200         return self._tubid
201     def get_version(self):
202         if self.rref:
203             return self.rref.version
204         return None
205     def name(self): # keep methodname short
206         return self.serverid_s
207     def longname(self):
208         return idlib.nodeid_b2a(self._tubid)
209     def get_lease_seed(self):
210         return self._tubid
211     def get_foolscap_write_enabler_seed(self):
212         return self._tubid
213
214     def get_nickname(self):
215         return self.announcement["nickname"].decode("utf-8")
216     def get_announcement(self):
217         return self.announcement
218     def get_remote_host(self):
219         return self.remote_host
220     def get_last_connect_time(self):
221         return self.last_connect_time
222     def get_last_loss_time(self):
223         return self.last_loss_time
224     def get_announcement_time(self):
225         return self.announcement_time
226
227     def start_connecting(self, tub, trigger_cb):
228         furl = self.announcement["FURL"]
229         self._trigger_cb = trigger_cb
230         self._reconnector = tub.connectTo(furl, self._got_connection)
231
232     def _got_connection(self, rref):
233         lp = log.msg(format="got connection to %(name)s, getting versions",
234                      name=self.name(),
235                      facility="tahoe.storage_broker", umid="coUECQ")
236         if self._trigger_cb:
237             eventually(self._trigger_cb)
238         default = self.VERSION_DEFAULTS
239         d = add_version_to_remote_reference(rref, default)
240         d.addCallback(self._got_versioned_service, lp)
241         d.addErrback(log.err, format="storageclient._got_connection",
242                      name=self.name(), umid="Sdq3pg")
243
244     def _got_versioned_service(self, rref, lp):
245         log.msg(format="%(name)s provided version info %(version)s",
246                 name=self.name(), version=rref.version,
247                 facility="tahoe.storage_broker", umid="SWmJYg",
248                 level=log.NOISY, parent=lp)
249
250         self.last_connect_time = time.time()
251         self.remote_host = rref.getPeer()
252         self.rref = rref
253         rref.notifyOnDisconnect(self._lost)
254
255     def get_rref(self):
256         return self.rref
257
258     def _lost(self):
259         log.msg(format="lost connection to %(name)s", name=self.name(),
260                 facility="tahoe.storage_broker", umid="zbRllw")
261         self.last_loss_time = time.time()
262         self.rref = None
263         self.remote_host = None
264
265     def stop_connecting(self):
266         # used when this descriptor has been superceded by another
267         self._reconnector.stopConnecting()
268
269     def try_to_connect(self):
270         # used when the broker wants us to hurry up
271         self._reconnector.reset()
272
273 class UnknownServerTypeError(Exception):
274     pass