3 I contain the client-side code which speaks to storage servers, in particular
4 the foolscap-based server implemented in src/allmydata/storage/*.py .
9 # 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
10 # create it, change uploader/servermap to get rrefs from it. ServerFarm calls
11 # IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
12 # to clients. webapi status pages call broker.get_info_about_serverid.
14 # 2: move get_info methods to the descriptor, webapi status pages call
15 # broker.get_descriptor_for_serverid().get_info
17 # 3?later?: store descriptors in UploadResults/etc instead of serverids,
18 # webapi status pages call descriptor.get_info and don't use storage_broker
21 # 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
22 # optional. This closes #467
24 # 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
25 # clients. Clients stop doing callRemote(), use NativeStorageClient methods
26 # instead (which might do something else, i.e. http or whatever). The
27 # introducer and tahoe.cfg only create NativeStorageClients for now.
29 # 6: implement other sorts of IStorageClient classes: S3, etc
33 from zope.interface import implements
34 from foolscap.api import eventually
35 from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer
36 from allmydata.util import log, base32
37 from allmydata.util.assertutil import precondition
38 from allmydata.util.rrefutil import add_version_to_remote_reference
39 from allmydata.util.hashutil import sha1
41 # who is responsible for de-duplication?
43 # IC remembers the unpacked announcements it receives, to provide for late
44 # subscribers and to remove duplicates
46 # if a client subscribes after startup, will they receive old announcements?
49 # who will be responsible for signature checking?
50 # make it be IntroducerClient, so they can push the filter outwards and
51 # reduce inbound network traffic
53 # what should the interface between StorageFarmBroker and IntroducerClient
55 # don't pass signatures: only pass validated blessed-objects
57 class StorageFarmBroker:
58 implements(IStorageBroker)
59 """I live on the client, and know about storage servers. For each server
60 that is participating in a grid, I either maintain a connection to it or
61 remember enough information to establish a connection to it on demand.
62 I'm also responsible for subscribing to the IntroducerClient to find out
63 about new servers as they are announced by the Introducer.
65 def __init__(self, tub, permute_peers, connected_threshold, connected_d,
68 assert permute_peers # False not implemented yet
69 self.permute_peers = permute_peers
70 self.connected_threshold = connected_threshold
71 self.connected_d = connected_d
72 self.preferred_peers = preferred_peers
73 # self.servers maps serverid -> IServer, and keeps track of all the
74 # storage servers that we've heard about. Each descriptor manages its
75 # own Reconnector, and will give us a RemoteReference when we ask
78 self.introducer_client = None
80 # these two are used in unit tests
81 def test_add_rref(self, serverid, rref, ann):
82 s = NativeStorageServer(serverid, ann.copy(), self)
84 s._is_connected = True
85 self.servers[serverid] = s
87 def test_add_server(self, serverid, s):
88 self.servers[serverid] = s
90 def use_introducer(self, introducer_client):
91 self.introducer_client = ic = introducer_client
92 ic.subscribe_to("storage", self._got_announcement)
94 def _got_announcement(self, key_s, ann):
96 precondition(isinstance(key_s, str), key_s)
97 precondition(key_s.startswith("v0-"), key_s)
98 assert ann["service-name"] == "storage"
99 s = NativeStorageServer(key_s, ann, self)
100 serverid = s.get_serverid()
101 old = self.servers.get(serverid)
103 if old.get_announcement() == ann:
106 del self.servers[serverid]
107 old.stop_connecting()
108 # now we forget about them and start using the new one
109 self.servers[serverid] = s
110 s.start_connecting(self.tub, self._trigger_connections)
111 # the descriptor will manage their own Reconnector, and each time we
112 # need servers, we'll ask them if they're connected or not.
114 def _trigger_connections(self):
115 # when one connection is established, reset the timers on all others,
116 # to trigger a reconnection attempt in one second. This is intended
117 # to accelerate server connections when we've been offline for a
118 # while. The goal is to avoid hanging out for a long time with
119 # connections to only a subset of the servers, which would increase
120 # the chances that we'll put shares in weird places (and not update
121 # existing shares of mutable files). See #374 for more details.
122 for dsc in self.servers.values():
125 def check_enough_connected(self):
126 if (self.connected_d is not None and
127 len(self.get_connected_servers()) >= self.connected_threshold):
129 self.connected_d = None
132 def get_servers_for_psi(self, peer_selection_index):
133 # return a list of server objects (IServers)
134 assert self.permute_peers == True
135 connected_servers = self.get_connected_servers()
136 preferred_servers = frozenset(s for s in connected_servers if s.get_longname() in self.preferred_peers)
137 def _permuted(server):
138 seed = server.get_permutation_seed()
139 is_unpreferred = server not in preferred_servers
140 return (is_unpreferred, sha1(peer_selection_index + seed).digest())
141 return sorted(connected_servers, key=_permuted)
143 def get_all_serverids(self):
144 return frozenset(self.servers.keys())
146 def get_connected_servers(self):
147 return frozenset([s for s in self.servers.values() if s.is_connected()])
149 def get_known_servers(self):
150 return frozenset(self.servers.values())
152 def get_nickname_for_serverid(self, serverid):
153 if serverid in self.servers:
154 return self.servers[serverid].get_nickname()
157 def get_stub_server(self, serverid):
158 if serverid in self.servers:
159 return self.servers[serverid]
160 return StubServer(serverid)
163 implements(IDisplayableServer)
164 def __init__(self, serverid):
165 self.serverid = serverid # binary tubid
166 def get_serverid(self):
169 return base32.b2a(self.serverid)[:8]
170 def get_longname(self):
171 return base32.b2a(self.serverid)
172 def get_nickname(self):
175 class NativeStorageServer:
176 """I hold information about a storage server that we want to connect to.
177 If we are connected, I hold the RemoteReference, their host address, and
178 the their version information. I remember information about when we were
179 last connected too, even if we aren't currently connected.
181 @ivar announcement_time: when we first heard about this service
182 @ivar last_connect_time: when we last established a connection
183 @ivar last_loss_time: when we last lost a connection
185 @ivar version: the server's versiondict, from the most recent announcement
186 @ivar nickname: the server's self-reported nickname (unicode), same
188 @ivar rref: the RemoteReference, if connected, otherwise None
189 @ivar remote_host: the IAddress, if connected, otherwise None
194 "http://allmydata.org/tahoe/protocols/storage/v1" :
195 { "maximum-immutable-share-size": 2**32 - 1,
196 "maximum-mutable-share-size": 2*1000*1000*1000, # maximum prior to v1.9.2
197 "tolerates-immutable-read-overrun": False,
198 "delete-mutable-shares-with-zero-length-writev": False,
199 "available-space": None,
201 "application-version": "unknown: no get_version()",
204 def __init__(self, key_s, ann, broker):
206 self.announcement = ann
209 assert "anonymous-storage-FURL" in ann, ann
210 furl = str(ann["anonymous-storage-FURL"])
211 m = re.match(r'pb://(\w+)@', furl)
213 tubid_s = m.group(1).lower()
214 self._tubid = base32.a2b(tubid_s)
215 assert "permutation-seed-base32" in ann, ann
216 ps = base32.a2b(str(ann["permutation-seed-base32"]))
217 self._permutation_seed = ps
220 self._long_description = key_s
221 if key_s.startswith("v0-"):
222 # remove v0- prefix from abbreviated name
223 self._short_description = key_s[3:3+8]
225 self._short_description = key_s[:8]
227 self._long_description = tubid_s
228 self._short_description = tubid_s[:6]
230 self.announcement_time = time.time()
231 self.last_connect_time = None
232 self.last_loss_time = None
233 self.remote_host = None
235 self._is_connected = False
236 self._reconnector = None
237 self._trigger_cb = None
239 # Special methods used by copy.copy() and copy.deepcopy(). When those are
240 # used in allmydata.immutable.filenode to copy CheckResults during
241 # repair, we want it to treat the IServer instances as singletons, and
242 # not attempt to duplicate them..
245 def __deepcopy__(self, memodict):
249 return "<NativeStorageServer for %s>" % self.get_name()
250 def get_serverid(self):
251 return self._tubid # XXX replace with self.key_s
252 def get_permutation_seed(self):
253 return self._permutation_seed
254 def get_version(self):
256 return self.rref.version
258 def get_name(self): # keep methodname short
259 # TODO: decide who adds [] in the short description. It should
260 # probably be the output side, not here.
261 return self._short_description
262 def get_longname(self):
263 return self._long_description
264 def get_lease_seed(self):
266 def get_foolscap_write_enabler_seed(self):
269 def get_nickname(self):
270 return self.announcement["nickname"]
271 def get_announcement(self):
272 return self.announcement
273 def get_remote_host(self):
274 return self.remote_host
275 def is_connected(self):
276 return self._is_connected
277 def get_last_connect_time(self):
278 return self.last_connect_time
279 def get_last_loss_time(self):
280 return self.last_loss_time
281 def get_announcement_time(self):
282 return self.announcement_time
284 def get_available_space(self):
285 version = self.get_version()
288 protocol_v1_version = version.get('http://allmydata.org/tahoe/protocols/storage/v1', {})
289 available_space = protocol_v1_version.get('available-space')
290 if available_space is None:
291 available_space = protocol_v1_version.get('maximum-immutable-share-size', None)
292 return available_space
294 def start_connecting(self, tub, trigger_cb):
295 furl = str(self.announcement["anonymous-storage-FURL"])
296 self._trigger_cb = trigger_cb
297 self._reconnector = tub.connectTo(furl, self._got_connection)
299 def _got_connection(self, rref):
300 lp = log.msg(format="got connection to %(name)s, getting versions",
301 name=self.get_name(),
302 facility="tahoe.storage_broker", umid="coUECQ")
304 eventually(self._trigger_cb)
305 default = self.VERSION_DEFAULTS
306 d = add_version_to_remote_reference(rref, default)
307 d.addCallback(self._got_versioned_service, lp)
308 d.addCallback(lambda ign: self.broker.check_enough_connected())
309 d.addErrback(log.err, format="storageclient._got_connection",
310 name=self.get_name(), umid="Sdq3pg")
312 def _got_versioned_service(self, rref, lp):
313 log.msg(format="%(name)s provided version info %(version)s",
314 name=self.get_name(), version=rref.version,
315 facility="tahoe.storage_broker", umid="SWmJYg",
316 level=log.NOISY, parent=lp)
318 self.last_connect_time = time.time()
319 self.remote_host = rref.getPeer()
321 self._is_connected = True
322 rref.notifyOnDisconnect(self._lost)
328 log.msg(format="lost connection to %(name)s", name=self.get_name(),
329 facility="tahoe.storage_broker", umid="zbRllw")
330 self.last_loss_time = time.time()
331 # self.rref is now stale: all callRemote()s will get a
332 # DeadReferenceError. We leave the stale reference in place so that
333 # uploader/downloader code (which received this IServer through
334 # get_connected_servers() or get_servers_for_psi()) can continue to
335 # use s.get_rref().callRemote() and not worry about it being None.
336 self._is_connected = False
337 self.remote_host = None
339 def stop_connecting(self):
340 # used when this descriptor has been superceded by another
341 self._reconnector.stopConnecting()
343 def try_to_connect(self):
344 # used when the broker wants us to hurry up
345 self._reconnector.reset()
347 class UnknownServerTypeError(Exception):