]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/storage_client.py
big rework of introducer client: change local API, split division of responsibilites...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / storage_client.py
1
2 """
3 I contain the client-side code which speaks to storage servers, in particular
4 the foolscap-based server implemented in src/allmydata/storage/*.py .
5 """
6
7 # roadmap:
8 #
9 # 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
10 # create it, change uploader/servermap to get rrefs from it. ServerFarm calls
11 # IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
12 # to clients. webapi status pages call broker.get_info_about_serverid.
13 #
14 # 2: move get_info methods to the descriptor, webapi status pages call
15 # broker.get_descriptor_for_serverid().get_info
16 #
17 # 3?later?: store descriptors in UploadResults/etc instead of serverids,
18 # webapi status pages call descriptor.get_info and don't use storage_broker
19 # or Client
20 #
21 # 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
22 # optional. This closes #467
23 #
24 # 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
25 # clients. Clients stop doing callRemote(), use NativeStorageClient methods
26 # instead (which might do something else, i.e. http or whatever). The
27 # introducer and tahoe.cfg only create NativeStorageClients for now.
28 #
29 # 6: implement other sorts of IStorageClient classes: S3, etc
30
31 import sha, time
32 from zope.interface import implements, Interface
33 from foolscap.api import eventually
34 from allmydata.interfaces import IStorageBroker
35 from allmydata.util import idlib, log
36 from allmydata.util.rrefutil import add_version_to_remote_reference
37
38 # who is responsible for de-duplication?
39 #  both?
40 #  IC remembers the unpacked announcements it receives, to provide for late
41 #  subscribers and to remove duplicates
42
43 # if a client subscribes after startup, will they receive old announcements?
44 #  yes
45
46 # who will be responsible for signature checking?
47 #  make it be IntroducerClient, so they can push the filter outwards and
48 #  reduce inbound network traffic
49
50 # what should the interface between StorageFarmBroker and IntroducerClient
51 # look like?
52 #  don't pass signatures: only pass validated blessed-objects
53
54 class StorageFarmBroker:
55     implements(IStorageBroker)
56     """I live on the client, and know about storage servers. For each server
57     that is participating in a grid, I either maintain a connection to it or
58     remember enough information to establish a connection to it on demand.
59     I'm also responsible for subscribing to the IntroducerClient to find out
60     about new servers as they are announced by the Introducer.
61     """
62     def __init__(self, tub, permute_peers):
63         self.tub = tub
64         assert permute_peers # False not implemented yet
65         self.permute_peers = permute_peers
66         # self.descriptors maps serverid -> IServerDescriptor, and keeps
67         # track of all the storage servers that we've heard about. Each
68         # descriptor manages its own Reconnector, and will give us a
69         # RemoteReference when we ask them for it.
70         self.descriptors = {}
71         # self.servers are statically configured from unit tests
72         self.test_servers = {} # serverid -> rref
73         self.introducer_client = None
74
75     # these two are used in unit tests
76     def test_add_server(self, serverid, rref):
77         self.test_servers[serverid] = rref
78     def test_add_descriptor(self, serverid, dsc):
79         self.descriptors[serverid] = dsc
80
81     def use_introducer(self, introducer_client):
82         self.introducer_client = ic = introducer_client
83         ic.subscribe_to("storage", self._got_announcement)
84
85     def _got_announcement(self, serverid, ann_d):
86         assert ann_d["service-name"] == "storage"
87         old = self.descriptors.get(serverid)
88         if old:
89             if old.get_announcement() == ann_d:
90                 return # duplicate
91             # replacement
92             del self.descriptors[serverid]
93             old.stop_connecting()
94             # now we forget about them and start using the new one
95         dsc = NativeStorageClientDescriptor(serverid, ann_d)
96         self.descriptors[serverid] = dsc
97         dsc.start_connecting(self.tub, self._trigger_connections)
98         # the descriptor will manage their own Reconnector, and each time we
99         # need servers, we'll ask them if they're connected or not.
100
101     def _trigger_connections(self):
102         # when one connection is established, reset the timers on all others,
103         # to trigger a reconnection attempt in one second. This is intended
104         # to accelerate server connections when we've been offline for a
105         # while. The goal is to avoid hanging out for a long time with
106         # connections to only a subset of the servers, which would increase
107         # the chances that we'll put shares in weird places (and not update
108         # existing shares of mutable files). See #374 for more details.
109         for dsc in self.descriptors.values():
110             dsc.try_to_connect()
111
112
113
114     def get_servers_for_index(self, peer_selection_index):
115         # first cut: return a list of (peerid, versioned-rref) tuples
116         assert self.permute_peers == True
117         servers = self.get_all_servers()
118         key = peer_selection_index
119         return sorted(servers, key=lambda x: sha.new(key+x[0]).digest())
120
121     def get_all_servers(self):
122         # return a frozenset of (peerid, versioned-rref) tuples
123         servers = {}
124         for serverid,rref in self.test_servers.items():
125             servers[serverid] = rref
126         for serverid,dsc in self.descriptors.items():
127             rref = dsc.get_rref()
128             if rref:
129                 servers[serverid] = rref
130         return frozenset(servers.items())
131
132     def get_all_serverids(self):
133         serverids = set()
134         serverids.update(self.test_servers.keys())
135         serverids.update(self.descriptors.keys())
136         return frozenset(serverids)
137
138     def get_all_descriptors(self):
139         return sorted(self.descriptors.values(),
140                       key=lambda dsc: dsc.get_serverid())
141
142     def get_nickname_for_serverid(self, serverid):
143         if serverid in self.descriptors:
144             return self.descriptors[serverid].get_nickname()
145         return None
146
147
148 class IServerDescriptor(Interface):
149     def start_connecting(tub, trigger_cb):
150         pass
151     def get_nickname():
152         pass
153     def get_rref():
154         pass
155
156 class NativeStorageClientDescriptor:
157     """I hold information about a storage server that we want to connect to.
158     If we are connected, I hold the RemoteReference, their host address, and
159     the their version information. I remember information about when we were
160     last connected too, even if we aren't currently connected.
161
162     @ivar announcement_time: when we first heard about this service
163     @ivar last_connect_time: when we last established a connection
164     @ivar last_loss_time: when we last lost a connection
165
166     @ivar version: the server's versiondict, from the most recent announcement
167     @ivar nickname: the server's self-reported nickname (unicode), same
168
169     @ivar rref: the RemoteReference, if connected, otherwise None
170     @ivar remote_host: the IAddress, if connected, otherwise None
171     """
172     implements(IServerDescriptor)
173
174     VERSION_DEFAULTS = {
175         "http://allmydata.org/tahoe/protocols/storage/v1" :
176         { "maximum-immutable-share-size": 2**32,
177           "tolerates-immutable-read-overrun": False,
178           "delete-mutable-shares-with-zero-length-writev": False,
179           },
180         "application-version": "unknown: no get_version()",
181         }
182
183     def __init__(self, serverid, ann_d, min_shares=1):
184         self.serverid = serverid
185         self.announcement = ann_d
186         self.min_shares = min_shares
187
188         self.serverid_s = idlib.shortnodeid_b2a(self.serverid)
189         self.announcement_time = time.time()
190         self.last_connect_time = None
191         self.last_loss_time = None
192         self.remote_host = None
193         self.rref = None
194         self._reconnector = None
195         self._trigger_cb = None
196
197     def get_serverid(self):
198         return self.serverid
199
200     def get_nickname(self):
201         return self.announcement["nickname"].decode("utf-8")
202     def get_announcement(self):
203         return self.announcement
204     def get_remote_host(self):
205         return self.remote_host
206     def get_last_connect_time(self):
207         return self.last_connect_time
208     def get_last_loss_time(self):
209         return self.last_loss_time
210     def get_announcement_time(self):
211         return self.announcement_time
212
213     def start_connecting(self, tub, trigger_cb):
214         furl = self.announcement["FURL"]
215         self._trigger_cb = trigger_cb
216         self._reconnector = tub.connectTo(furl, self._got_connection)
217
218     def _got_connection(self, rref):
219         lp = log.msg(format="got connection to %(serverid)s, getting versions",
220                      serverid=self.serverid_s,
221                      facility="tahoe.storage_broker", umid="coUECQ")
222         if self._trigger_cb:
223             eventually(self._trigger_cb)
224         default = self.VERSION_DEFAULTS
225         d = add_version_to_remote_reference(rref, default)
226         d.addCallback(self._got_versioned_service, lp)
227         d.addErrback(log.err, format="storageclient._got_connection",
228                      serverid=self.serverid_s, umid="Sdq3pg")
229
230     def _got_versioned_service(self, rref, lp):
231         log.msg(format="%(serverid)s provided version info %(version)s",
232                 serverid=self.serverid_s, version=rref.version,
233                 facility="tahoe.storage_broker", umid="SWmJYg",
234                 level=log.NOISY, parent=lp)
235
236         self.last_connect_time = time.time()
237         self.remote_host = rref.getPeer()
238         self.rref = rref
239         rref.notifyOnDisconnect(self._lost)
240
241     def get_rref(self):
242         return self.rref
243
244     def _lost(self):
245         log.msg(format="lost connection to %(serverid)s",
246                 serverid=self.serverid_s,
247                 facility="tahoe.storage_broker", umid="zbRllw")
248         self.last_loss_time = time.time()
249         self.rref = None
250         self.remote_host = None
251
252     def stop_connecting(self):
253         # used when this descriptor has been superceded by another
254         self._reconnector.stopConnecting()
255
256     def try_to_connect(self):
257         # used when the broker wants us to hurry up
258         self._reconnector.reset()
259
260 class UnknownServerTypeError(Exception):
261     pass