]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage_client.py
more #859: avoid deprecation warning for unit tests too, hush pyflakes
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage_client.py
1
2 """
3 I contain the client-side code which speaks to storage servers, in particular
4 the foolscap-based server implemented in src/allmydata/storage/*.py .
5 """
6
7 # roadmap:
8 #
9 # 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
10 # create it, change uploader/servermap to get rrefs from it. ServerFarm calls
11 # IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
12 # to clients. webapi status pages call broker.get_info_about_serverid.
13 #
14 # 2: move get_info methods to the descriptor, webapi status pages call
15 # broker.get_descriptor_for_serverid().get_info
16 #
17 # 3?later?: store descriptors in UploadResults/etc instead of serverids,
18 # webapi status pages call descriptor.get_info and don't use storage_broker
19 # or Client
20 #
21 # 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
22 # optional. This closes #467
23 #
24 # 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
25 # clients. Clients stop doing callRemote(), use NativeStorageClient methods
26 # instead (which might do something else, i.e. http or whatever). The
27 # introducer and tahoe.cfg only create NativeStorageClients for now.
28 #
29 # 6: implement other sorts of IStorageClient classes: S3, etc
30
31
32 import time
33 from zope.interface import implements, Interface
34 from foolscap.api import eventually
35 from allmydata.interfaces import IStorageBroker
36 from allmydata.util import idlib, log
37 from allmydata.util.assertutil import _assert, precondition
38 from allmydata.util.rrefutil import add_version_to_remote_reference
39 from allmydata.util.hashutil import sha1
40
41 # who is responsible for de-duplication?
42 #  both?
43 #  IC remembers the unpacked announcements it receives, to provide for late
44 #  subscribers and to remove duplicates
45
46 # if a client subscribes after startup, will they receive old announcements?
47 #  yes
48
49 # who will be responsible for signature checking?
50 #  make it be IntroducerClient, so they can push the filter outwards and
51 #  reduce inbound network traffic
52
53 # what should the interface between StorageFarmBroker and IntroducerClient
54 # look like?
55 #  don't pass signatures: only pass validated blessed-objects
56
57 class StorageFarmBroker:
58     implements(IStorageBroker)
59     """I live on the client, and know about storage servers. For each server
60     that is participating in a grid, I either maintain a connection to it or
61     remember enough information to establish a connection to it on demand.
62     I'm also responsible for subscribing to the IntroducerClient to find out
63     about new servers as they are announced by the Introducer.
64     """
65     def __init__(self, tub, permute_peers):
66         self.tub = tub
67         assert permute_peers # False not implemented yet
68         self.permute_peers = permute_peers
69         # self.descriptors maps serverid -> IServerDescriptor, and keeps
70         # track of all the storage servers that we've heard about. Each
71         # descriptor manages its own Reconnector, and will give us a
72         # RemoteReference when we ask them for it.
73         self.descriptors = {}
74         # self.test_servers are statically configured from unit tests
75         self.test_servers = {} # serverid -> rref
76         self.introducer_client = None
77
78     # these two are used in unit tests
79     def test_add_server(self, serverid, rref):
80         self.test_servers[serverid] = rref
81     def test_add_descriptor(self, serverid, dsc):
82         self.descriptors[serverid] = dsc
83
84     def use_introducer(self, introducer_client):
85         self.introducer_client = ic = introducer_client
86         ic.subscribe_to("storage", self._got_announcement)
87
88     def _got_announcement(self, serverid, ann_d):
89         precondition(isinstance(serverid, str), serverid)
90         precondition(len(serverid) == 20, serverid)
91         assert ann_d["service-name"] == "storage"
92         old = self.descriptors.get(serverid)
93         if old:
94             if old.get_announcement() == ann_d:
95                 return # duplicate
96             # replacement
97             del self.descriptors[serverid]
98             old.stop_connecting()
99             # now we forget about them and start using the new one
100         dsc = NativeStorageClientDescriptor(serverid, ann_d)
101         self.descriptors[serverid] = dsc
102         dsc.start_connecting(self.tub, self._trigger_connections)
103         # the descriptor will manage their own Reconnector, and each time we
104         # need servers, we'll ask them if they're connected or not.
105
106     def _trigger_connections(self):
107         # when one connection is established, reset the timers on all others,
108         # to trigger a reconnection attempt in one second. This is intended
109         # to accelerate server connections when we've been offline for a
110         # while. The goal is to avoid hanging out for a long time with
111         # connections to only a subset of the servers, which would increase
112         # the chances that we'll put shares in weird places (and not update
113         # existing shares of mutable files). See #374 for more details.
114         for dsc in self.descriptors.values():
115             dsc.try_to_connect()
116
117
118
119     def get_servers_for_index(self, peer_selection_index):
120         # first cut: return a list of (peerid, versioned-rref) tuples
121         assert self.permute_peers == True
122         servers = self.get_all_servers()
123         key = peer_selection_index
124         return sorted(servers, key=lambda x: sha1(key+x[0]).digest())
125
126     def get_all_servers(self):
127         # return a frozenset of (peerid, versioned-rref) tuples
128         servers = {}
129         for serverid,rref in self.test_servers.items():
130             servers[serverid] = rref
131         for serverid,dsc in self.descriptors.items():
132             rref = dsc.get_rref()
133             if rref:
134                 servers[serverid] = rref
135         result = frozenset(servers.items())
136         _assert(len(result) <= len(self.get_all_serverids()), result, self.get_all_serverids())
137         return result
138
139     def get_all_serverids(self):
140         serverids = set()
141         serverids.update(self.test_servers.keys())
142         serverids.update(self.descriptors.keys())
143         return frozenset(serverids)
144
145     def get_all_descriptors(self):
146         return sorted(self.descriptors.values(),
147                       key=lambda dsc: dsc.get_serverid())
148
149     def get_nickname_for_serverid(self, serverid):
150         if serverid in self.descriptors:
151             return self.descriptors[serverid].get_nickname()
152         return None
153
154
155 class IServerDescriptor(Interface):
156     def start_connecting(tub, trigger_cb):
157         pass
158     def get_nickname():
159         pass
160     def get_rref():
161         pass
162
163 class NativeStorageClientDescriptor:
164     """I hold information about a storage server that we want to connect to.
165     If we are connected, I hold the RemoteReference, their host address, and
166     the their version information. I remember information about when we were
167     last connected too, even if we aren't currently connected.
168
169     @ivar announcement_time: when we first heard about this service
170     @ivar last_connect_time: when we last established a connection
171     @ivar last_loss_time: when we last lost a connection
172
173     @ivar version: the server's versiondict, from the most recent announcement
174     @ivar nickname: the server's self-reported nickname (unicode), same
175
176     @ivar rref: the RemoteReference, if connected, otherwise None
177     @ivar remote_host: the IAddress, if connected, otherwise None
178     """
179     implements(IServerDescriptor)
180
181     VERSION_DEFAULTS = {
182         "http://allmydata.org/tahoe/protocols/storage/v1" :
183         { "maximum-immutable-share-size": 2**32,
184           "tolerates-immutable-read-overrun": False,
185           "delete-mutable-shares-with-zero-length-writev": False,
186           },
187         "application-version": "unknown: no get_version()",
188         }
189
190     def __init__(self, serverid, ann_d, min_shares=1):
191         self.serverid = serverid
192         self.announcement = ann_d
193         self.min_shares = min_shares
194
195         self.serverid_s = idlib.shortnodeid_b2a(self.serverid)
196         self.announcement_time = time.time()
197         self.last_connect_time = None
198         self.last_loss_time = None
199         self.remote_host = None
200         self.rref = None
201         self._reconnector = None
202         self._trigger_cb = None
203
204     def get_serverid(self):
205         return self.serverid
206
207     def get_nickname(self):
208         return self.announcement["nickname"].decode("utf-8")
209     def get_announcement(self):
210         return self.announcement
211     def get_remote_host(self):
212         return self.remote_host
213     def get_last_connect_time(self):
214         return self.last_connect_time
215     def get_last_loss_time(self):
216         return self.last_loss_time
217     def get_announcement_time(self):
218         return self.announcement_time
219
220     def start_connecting(self, tub, trigger_cb):
221         furl = self.announcement["FURL"]
222         self._trigger_cb = trigger_cb
223         self._reconnector = tub.connectTo(furl, self._got_connection)
224
225     def _got_connection(self, rref):
226         lp = log.msg(format="got connection to %(serverid)s, getting versions",
227                      serverid=self.serverid_s,
228                      facility="tahoe.storage_broker", umid="coUECQ")
229         if self._trigger_cb:
230             eventually(self._trigger_cb)
231         default = self.VERSION_DEFAULTS
232         d = add_version_to_remote_reference(rref, default)
233         d.addCallback(self._got_versioned_service, lp)
234         d.addErrback(log.err, format="storageclient._got_connection",
235                      serverid=self.serverid_s, umid="Sdq3pg")
236
237     def _got_versioned_service(self, rref, lp):
238         log.msg(format="%(serverid)s provided version info %(version)s",
239                 serverid=self.serverid_s, version=rref.version,
240                 facility="tahoe.storage_broker", umid="SWmJYg",
241                 level=log.NOISY, parent=lp)
242
243         self.last_connect_time = time.time()
244         self.remote_host = rref.getPeer()
245         self.rref = rref
246         rref.notifyOnDisconnect(self._lost)
247
248     def get_rref(self):
249         return self.rref
250
251     def _lost(self):
252         log.msg(format="lost connection to %(serverid)s",
253                 serverid=self.serverid_s,
254                 facility="tahoe.storage_broker", umid="zbRllw")
255         self.last_loss_time = time.time()
256         self.rref = None
257         self.remote_host = None
258
259     def stop_connecting(self):
260         # used when this descriptor has been superceded by another
261         self._reconnector.stopConnecting()
262
263     def try_to_connect(self):
264         # used when the broker wants us to hurry up
265         self._reconnector.reset()
266
267 class UnknownServerTypeError(Exception):
268     pass