3 I contain the client-side code which speaks to storage servers, in particular
4 the foolscap-based server implemented in src/allmydata/storage/*.py .
9 # 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
10 # create it, change uploader/servermap to get rrefs from it. ServerFarm calls
11 # IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
12 # to clients. webapi status pages call broker.get_info_about_serverid.
14 # 2: move get_info methods to the descriptor, webapi status pages call
15 # broker.get_descriptor_for_serverid().get_info
17 # 3?later?: store descriptors in UploadResults/etc instead of serverids,
18 # webapi status pages call descriptor.get_info and don't use storage_broker
21 # 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
22 # optional. This closes #467
24 # 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
25 # clients. Clients stop doing callRemote(), use NativeStorageClient methods
26 # instead (which might do something else, i.e. http or whatever). The
27 # introducer and tahoe.cfg only create NativeStorageClients for now.
29 # 6: implement other sorts of IStorageClient classes: S3, etc
33 from zope.interface import implements
34 from foolscap.api import eventually
35 from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer
36 from allmydata.util import log, base32
37 from allmydata.util.assertutil import precondition
38 from allmydata.util.rrefutil import add_version_to_remote_reference
39 from allmydata.util.hashutil import sha1
41 # who is responsible for de-duplication?
43 # IC remembers the unpacked announcements it receives, to provide for late
44 # subscribers and to remove duplicates
46 # if a client subscribes after startup, will they receive old announcements?
49 # who will be responsible for signature checking?
50 # make it be IntroducerClient, so they can push the filter outwards and
51 # reduce inbound network traffic
53 # what should the interface between StorageFarmBroker and IntroducerClient
55 # don't pass signatures: only pass validated blessed-objects
57 class StorageFarmBroker:
58 implements(IStorageBroker)
59 """I live on the client, and know about storage servers. For each server
60 that is participating in a grid, I either maintain a connection to it or
61 remember enough information to establish a connection to it on demand.
62 I'm also responsible for subscribing to the IntroducerClient to find out
63 about new servers as they are announced by the Introducer.
65 def __init__(self, tub, permute_peers, preferred_peers=()):
67 assert permute_peers # False not implemented yet
68 self.permute_peers = permute_peers
69 self.preferred_peers = preferred_peers
70 # self.servers maps serverid -> IServer, and keeps track of all the
71 # storage servers that we've heard about. Each descriptor manages its
72 # own Reconnector, and will give us a RemoteReference when we ask
75 self.introducer_client = None
77 # these two are used in unit tests
78 def test_add_rref(self, serverid, rref, ann):
79 s = NativeStorageServer(serverid, ann.copy())
81 s._is_connected = True
82 self.servers[serverid] = s
84 def test_add_server(self, serverid, s):
85 self.servers[serverid] = s
87 def use_introducer(self, introducer_client):
88 self.introducer_client = ic = introducer_client
89 ic.subscribe_to("storage", self._got_announcement)
91 def _got_announcement(self, key_s, ann):
93 precondition(isinstance(key_s, str), key_s)
94 precondition(key_s.startswith("v0-"), key_s)
95 assert ann["service-name"] == "storage"
96 s = NativeStorageServer(key_s, ann)
97 serverid = s.get_serverid()
98 old = self.servers.get(serverid)
100 if old.get_announcement() == ann:
103 del self.servers[serverid]
104 old.stop_connecting()
105 # now we forget about them and start using the new one
106 self.servers[serverid] = s
107 s.start_connecting(self.tub, self._trigger_connections)
108 # the descriptor will manage their own Reconnector, and each time we
109 # need servers, we'll ask them if they're connected or not.
111 def _trigger_connections(self):
112 # when one connection is established, reset the timers on all others,
113 # to trigger a reconnection attempt in one second. This is intended
114 # to accelerate server connections when we've been offline for a
115 # while. The goal is to avoid hanging out for a long time with
116 # connections to only a subset of the servers, which would increase
117 # the chances that we'll put shares in weird places (and not update
118 # existing shares of mutable files). See #374 for more details.
119 for dsc in self.servers.values():
122 def get_servers_for_psi(self, peer_selection_index):
123 # return a list of server objects (IServers)
124 assert self.permute_peers == True
125 connected_servers = self.get_connected_servers()
126 preferred_servers = frozenset(s for s in connected_servers if s.get_longname() in self.preferred_peers)
127 def _permuted(server):
128 seed = server.get_permutation_seed()
129 is_unpreferred = server not in preferred_servers
130 return (is_unpreferred, sha1(peer_selection_index + seed).digest())
131 return sorted(connected_servers, key=_permuted)
133 def get_all_serverids(self):
134 return frozenset(self.servers.keys())
136 def get_connected_servers(self):
137 return frozenset([s for s in self.servers.values() if s.is_connected()])
139 def get_known_servers(self):
140 return frozenset(self.servers.values())
142 def get_nickname_for_serverid(self, serverid):
143 if serverid in self.servers:
144 return self.servers[serverid].get_nickname()
147 def get_stub_server(self, serverid):
148 if serverid in self.servers:
149 return self.servers[serverid]
150 return StubServer(serverid)
153 implements(IDisplayableServer)
154 def __init__(self, serverid):
155 self.serverid = serverid # binary tubid
156 def get_serverid(self):
159 return base32.b2a(self.serverid)[:8]
160 def get_longname(self):
161 return base32.b2a(self.serverid)
162 def get_nickname(self):
165 class NativeStorageServer:
166 """I hold information about a storage server that we want to connect to.
167 If we are connected, I hold the RemoteReference, their host address, and
168 the their version information. I remember information about when we were
169 last connected too, even if we aren't currently connected.
171 @ivar announcement_time: when we first heard about this service
172 @ivar last_connect_time: when we last established a connection
173 @ivar last_loss_time: when we last lost a connection
175 @ivar version: the server's versiondict, from the most recent announcement
176 @ivar nickname: the server's self-reported nickname (unicode), same
178 @ivar rref: the RemoteReference, if connected, otherwise None
179 @ivar remote_host: the IAddress, if connected, otherwise None
184 "http://allmydata.org/tahoe/protocols/storage/v1" :
185 { "maximum-immutable-share-size": 2**32 - 1,
186 "maximum-mutable-share-size": 2*1000*1000*1000, # maximum prior to v1.9.2
187 "tolerates-immutable-read-overrun": False,
188 "delete-mutable-shares-with-zero-length-writev": False,
189 "available-space": None,
191 "application-version": "unknown: no get_version()",
194 def __init__(self, key_s, ann):
196 self.announcement = ann
198 assert "anonymous-storage-FURL" in ann, ann
199 furl = str(ann["anonymous-storage-FURL"])
200 m = re.match(r'pb://(\w+)@', furl)
202 tubid_s = m.group(1).lower()
203 self._tubid = base32.a2b(tubid_s)
204 assert "permutation-seed-base32" in ann, ann
205 ps = base32.a2b(str(ann["permutation-seed-base32"]))
206 self._permutation_seed = ps
209 self._long_description = key_s
210 if key_s.startswith("v0-"):
211 # remove v0- prefix from abbreviated name
212 self._short_description = key_s[3:3+8]
214 self._short_description = key_s[:8]
216 self._long_description = tubid_s
217 self._short_description = tubid_s[:6]
219 self.announcement_time = time.time()
220 self.last_connect_time = None
221 self.last_loss_time = None
222 self.remote_host = None
224 self._is_connected = False
225 self._reconnector = None
226 self._trigger_cb = None
228 # Special methods used by copy.copy() and copy.deepcopy(). When those are
229 # used in allmydata.immutable.filenode to copy CheckResults during
230 # repair, we want it to treat the IServer instances as singletons, and
231 # not attempt to duplicate them..
234 def __deepcopy__(self, memodict):
238 return "<NativeStorageServer for %s>" % self.get_name()
239 def get_serverid(self):
240 return self._tubid # XXX replace with self.key_s
241 def get_permutation_seed(self):
242 return self._permutation_seed
243 def get_version(self):
245 return self.rref.version
247 def get_name(self): # keep methodname short
248 # TODO: decide who adds [] in the short description. It should
249 # probably be the output side, not here.
250 return self._short_description
251 def get_longname(self):
252 return self._long_description
253 def get_lease_seed(self):
255 def get_foolscap_write_enabler_seed(self):
258 def get_nickname(self):
259 return self.announcement["nickname"]
260 def get_announcement(self):
261 return self.announcement
262 def get_remote_host(self):
263 return self.remote_host
264 def is_connected(self):
265 return self._is_connected
266 def get_last_connect_time(self):
267 return self.last_connect_time
268 def get_last_loss_time(self):
269 return self.last_loss_time
270 def get_announcement_time(self):
271 return self.announcement_time
273 def get_available_space(self):
274 version = self.get_version()
277 protocol_v1_version = version.get('http://allmydata.org/tahoe/protocols/storage/v1', {})
278 available_space = protocol_v1_version.get('available-space')
279 if available_space is None:
280 available_space = protocol_v1_version.get('maximum-immutable-share-size', None)
281 return available_space
283 def start_connecting(self, tub, trigger_cb):
284 furl = str(self.announcement["anonymous-storage-FURL"])
285 self._trigger_cb = trigger_cb
286 self._reconnector = tub.connectTo(furl, self._got_connection)
288 def _got_connection(self, rref):
289 lp = log.msg(format="got connection to %(name)s, getting versions",
290 name=self.get_name(),
291 facility="tahoe.storage_broker", umid="coUECQ")
293 eventually(self._trigger_cb)
294 default = self.VERSION_DEFAULTS
295 d = add_version_to_remote_reference(rref, default)
296 d.addCallback(self._got_versioned_service, lp)
297 d.addErrback(log.err, format="storageclient._got_connection",
298 name=self.get_name(), umid="Sdq3pg")
300 def _got_versioned_service(self, rref, lp):
301 log.msg(format="%(name)s provided version info %(version)s",
302 name=self.get_name(), version=rref.version,
303 facility="tahoe.storage_broker", umid="SWmJYg",
304 level=log.NOISY, parent=lp)
306 self.last_connect_time = time.time()
307 self.remote_host = rref.getPeer()
309 self._is_connected = True
310 rref.notifyOnDisconnect(self._lost)
316 log.msg(format="lost connection to %(name)s", name=self.get_name(),
317 facility="tahoe.storage_broker", umid="zbRllw")
318 self.last_loss_time = time.time()
319 # self.rref is now stale: all callRemote()s will get a
320 # DeadReferenceError. We leave the stale reference in place so that
321 # uploader/downloader code (which received this IServer through
322 # get_connected_servers() or get_servers_for_psi()) can continue to
323 # use s.get_rref().callRemote() and not worry about it being None.
324 self._is_connected = False
325 self.remote_host = None
327 def stop_connecting(self):
328 # used when this descriptor has been superceded by another
329 self._reconnector.stopConnecting()
331 def try_to_connect(self):
332 # used when the broker wants us to hurry up
333 self._reconnector.reset()
335 class UnknownServerTypeError(Exception):