3 I contain the client-side code which speaks to storage servers, in particular
4 the foolscap-based server implemented in src/allmydata/storage/*.py .
9 # 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
10 # create it, change uploader/servermap to get rrefs from it. ServerFarm calls
11 # IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
12 # to clients. webapi status pages call broker.get_info_about_serverid.
14 # 2: move get_info methods to the descriptor, webapi status pages call
15 # broker.get_descriptor_for_serverid().get_info
17 # 3?later?: store descriptors in UploadResults/etc instead of serverids,
18 # webapi status pages call descriptor.get_info and don't use storage_broker
21 # 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
22 # optional. This closes #467
24 # 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
25 # clients. Clients stop doing callRemote(), use NativeStorageClient methods
26 # instead (which might do something else, i.e. http or whatever). The
27 # introducer and tahoe.cfg only create NativeStorageClients for now.
29 # 6: implement other sorts of IStorageClient classes: S3, etc
33 from zope.interface import implements
34 from foolscap.api import eventually
35 from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer
36 from allmydata.util import log, base32
37 from allmydata.util.assertutil import precondition
38 from allmydata.util.rrefutil import add_version_to_remote_reference
39 from allmydata.util.hashutil import sha1
41 # who is responsible for de-duplication?
43 # IC remembers the unpacked announcements it receives, to provide for late
44 # subscribers and to remove duplicates
46 # if a client subscribes after startup, will they receive old announcements?
49 # who will be responsible for signature checking?
50 # make it be IntroducerClient, so they can push the filter outwards and
51 # reduce inbound network traffic
53 # what should the interface between StorageFarmBroker and IntroducerClient
55 # don't pass signatures: only pass validated blessed-objects
57 class StorageFarmBroker:
58 implements(IStorageBroker)
59 """I live on the client, and know about storage servers. For each server
60 that is participating in a grid, I either maintain a connection to it or
61 remember enough information to establish a connection to it on demand.
62 I'm also responsible for subscribing to the IntroducerClient to find out
63 about new servers as they are announced by the Introducer.
65 def __init__(self, tub, permute_peers):
67 assert permute_peers # False not implemented yet
68 self.permute_peers = permute_peers
69 # self.servers maps serverid -> IServer, and keeps track of all the
70 # storage servers that we've heard about. Each descriptor manages its
71 # own Reconnector, and will give us a RemoteReference when we ask
74 self.introducer_client = None
76 # these two are used in unit tests
77 def test_add_rref(self, serverid, rref, ann):
78 s = NativeStorageServer(serverid, ann.copy())
80 s._is_connected = True
81 self.servers[serverid] = s
83 def test_add_server(self, serverid, s):
84 self.servers[serverid] = s
86 def use_introducer(self, introducer_client):
87 self.introducer_client = ic = introducer_client
88 ic.subscribe_to("storage", self._got_announcement)
90 def _got_announcement(self, key_s, ann):
92 precondition(isinstance(key_s, str), key_s)
93 precondition(key_s.startswith("v0-"), key_s)
94 assert ann["service-name"] == "storage"
95 s = NativeStorageServer(key_s, ann)
96 serverid = s.get_serverid()
97 old = self.servers.get(serverid)
99 if old.get_announcement() == ann:
102 del self.servers[serverid]
103 old.stop_connecting()
104 # now we forget about them and start using the new one
105 self.servers[serverid] = s
106 s.start_connecting(self.tub, self._trigger_connections)
107 # the descriptor will manage their own Reconnector, and each time we
108 # need servers, we'll ask them if they're connected or not.
110 def _trigger_connections(self):
111 # when one connection is established, reset the timers on all others,
112 # to trigger a reconnection attempt in one second. This is intended
113 # to accelerate server connections when we've been offline for a
114 # while. The goal is to avoid hanging out for a long time with
115 # connections to only a subset of the servers, which would increase
116 # the chances that we'll put shares in weird places (and not update
117 # existing shares of mutable files). See #374 for more details.
118 for dsc in self.servers.values():
121 def get_servers_for_psi(self, peer_selection_index):
122 # return a list of server objects (IServers)
123 assert self.permute_peers == True
124 def _permuted(server):
125 seed = server.get_permutation_seed()
126 return sha1(peer_selection_index + seed).digest()
127 return sorted(self.get_connected_servers(), key=_permuted)
129 def get_all_serverids(self):
130 return frozenset(self.servers.keys())
132 def get_connected_servers(self):
133 return frozenset([s for s in self.servers.values() if s.is_connected()])
135 def get_known_servers(self):
136 return frozenset(self.servers.values())
138 def get_nickname_for_serverid(self, serverid):
139 if serverid in self.servers:
140 return self.servers[serverid].get_nickname()
143 def get_stub_server(self, serverid):
144 if serverid in self.servers:
145 return self.servers[serverid]
146 return StubServer(serverid)
149 implements(IDisplayableServer)
150 def __init__(self, serverid):
151 self.serverid = serverid # binary tubid
152 def get_serverid(self):
155 return base32.b2a(self.serverid)[:8]
156 def get_longname(self):
157 return base32.b2a(self.serverid)
158 def get_nickname(self):
161 class NativeStorageServer:
162 """I hold information about a storage server that we want to connect to.
163 If we are connected, I hold the RemoteReference, their host address, and
164 the their version information. I remember information about when we were
165 last connected too, even if we aren't currently connected.
167 @ivar announcement_time: when we first heard about this service
168 @ivar last_connect_time: when we last established a connection
169 @ivar last_loss_time: when we last lost a connection
171 @ivar version: the server's versiondict, from the most recent announcement
172 @ivar nickname: the server's self-reported nickname (unicode), same
174 @ivar rref: the RemoteReference, if connected, otherwise None
175 @ivar remote_host: the IAddress, if connected, otherwise None
180 "http://allmydata.org/tahoe/protocols/storage/v1" :
181 { "maximum-immutable-share-size": 2**32 - 1,
182 "maximum-mutable-share-size": 2*1000*1000*1000, # maximum prior to v1.9.2
183 "tolerates-immutable-read-overrun": False,
184 "delete-mutable-shares-with-zero-length-writev": False,
185 "available-space": None,
187 "application-version": "unknown: no get_version()",
190 def __init__(self, key_s, ann):
192 self.announcement = ann
194 assert "anonymous-storage-FURL" in ann, ann
195 furl = str(ann["anonymous-storage-FURL"])
196 m = re.match(r'pb://(\w+)@', furl)
198 tubid_s = m.group(1).lower()
199 self._tubid = base32.a2b(tubid_s)
200 assert "permutation-seed-base32" in ann, ann
201 ps = base32.a2b(str(ann["permutation-seed-base32"]))
202 self._permutation_seed = ps
205 self._long_description = key_s
206 if key_s.startswith("v0-"):
207 # remove v0- prefix from abbreviated name
208 self._short_description = key_s[3:3+8]
210 self._short_description = key_s[:8]
212 self._long_description = tubid_s
213 self._short_description = tubid_s[:6]
215 self.announcement_time = time.time()
216 self.last_connect_time = None
217 self.last_loss_time = None
218 self.remote_host = None
220 self._is_connected = False
221 self._reconnector = None
222 self._trigger_cb = None
224 # Special methods used by copy.copy() and copy.deepcopy(). When those are
225 # used in allmydata.immutable.filenode to copy CheckResults during
226 # repair, we want it to treat the IServer instances as singletons, and
227 # not attempt to duplicate them..
230 def __deepcopy__(self, memodict):
234 return "<NativeStorageServer for %s>" % self.get_name()
235 def get_serverid(self):
236 return self._tubid # XXX replace with self.key_s
237 def get_permutation_seed(self):
238 return self._permutation_seed
239 def get_version(self):
241 return self.rref.version
243 def get_name(self): # keep methodname short
244 # TODO: decide who adds [] in the short description. It should
245 # probably be the output side, not here.
246 return self._short_description
247 def get_longname(self):
248 return self._long_description
249 def get_lease_seed(self):
251 def get_foolscap_write_enabler_seed(self):
254 def get_nickname(self):
255 return self.announcement["nickname"]
256 def get_announcement(self):
257 return self.announcement
258 def get_remote_host(self):
259 return self.remote_host
260 def is_connected(self):
261 return self._is_connected
262 def get_last_connect_time(self):
263 return self.last_connect_time
264 def get_last_loss_time(self):
265 return self.last_loss_time
266 def get_announcement_time(self):
267 return self.announcement_time
269 def get_available_space(self):
270 version = self.get_version()
273 protocol_v1_version = version.get('http://allmydata.org/tahoe/protocols/storage/v1', {})
274 available_space = protocol_v1_version.get('available-space')
275 if available_space is None:
276 available_space = protocol_v1_version.get('maximum-immutable-share-size', None)
277 return available_space
279 def start_connecting(self, tub, trigger_cb):
280 furl = str(self.announcement["anonymous-storage-FURL"])
281 self._trigger_cb = trigger_cb
282 self._reconnector = tub.connectTo(furl, self._got_connection)
284 def _got_connection(self, rref):
285 lp = log.msg(format="got connection to %(name)s, getting versions",
286 name=self.get_name(),
287 facility="tahoe.storage_broker", umid="coUECQ")
289 eventually(self._trigger_cb)
290 default = self.VERSION_DEFAULTS
291 d = add_version_to_remote_reference(rref, default)
292 d.addCallback(self._got_versioned_service, lp)
293 d.addErrback(log.err, format="storageclient._got_connection",
294 name=self.get_name(), umid="Sdq3pg")
296 def _got_versioned_service(self, rref, lp):
297 log.msg(format="%(name)s provided version info %(version)s",
298 name=self.get_name(), version=rref.version,
299 facility="tahoe.storage_broker", umid="SWmJYg",
300 level=log.NOISY, parent=lp)
302 self.last_connect_time = time.time()
303 self.remote_host = rref.getPeer()
305 self._is_connected = True
306 rref.notifyOnDisconnect(self._lost)
312 log.msg(format="lost connection to %(name)s", name=self.get_name(),
313 facility="tahoe.storage_broker", umid="zbRllw")
314 self.last_loss_time = time.time()
315 # self.rref is now stale: all callRemote()s will get a
316 # DeadReferenceError. We leave the stale reference in place so that
317 # uploader/downloader code (which received this IServer through
318 # get_connected_servers() or get_servers_for_psi()) can continue to
319 # use s.get_rref().callRemote() and not worry about it being None.
320 self._is_connected = False
321 self.remote_host = None
323 def stop_connecting(self):
324 # used when this descriptor has been superceded by another
325 self._reconnector.stopConnecting()
327 def try_to_connect(self):
328 # used when the broker wants us to hurry up
329 self._reconnector.reset()
331 class UnknownServerTypeError(Exception):