3 I contain the client-side code which speaks to storage servers, in particular
4 the foolscap-based server implemented in src/allmydata/storage/*.py .
9 # 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
10 # create it, change uploader/servermap to get rrefs from it. ServerFarm calls
11 # IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
12 # to clients. webapi status pages call broker.get_info_about_serverid.
14 # 2: move get_info methods to the descriptor, webapi status pages call
15 # broker.get_descriptor_for_serverid().get_info
17 # 3?later?: store descriptors in UploadResults/etc instead of serverids,
18 # webapi status pages call descriptor.get_info and don't use storage_broker
21 # 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
22 # optional. This closes #467
24 # 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
25 # clients. Clients stop doing callRemote(), use NativeStorageClient methods
26 # instead (which might do something else, i.e. http or whatever). The
27 # introducer and tahoe.cfg only create NativeStorageClients for now.
29 # 6: implement other sorts of IStorageClient classes: S3, etc
33 from zope.interface import implements
34 from foolscap.api import eventually
35 from allmydata.interfaces import IStorageBroker, IDisplayableServer, IServer
36 from allmydata.util import log, base32
37 from allmydata.util.assertutil import precondition
38 from allmydata.util.rrefutil import add_version_to_remote_reference
39 from allmydata.util.hashutil import sha1
41 # who is responsible for de-duplication?
43 # IC remembers the unpacked announcements it receives, to provide for late
44 # subscribers and to remove duplicates
46 # if a client subscribes after startup, will they receive old announcements?
49 # who will be responsible for signature checking?
50 # make it be IntroducerClient, so they can push the filter outwards and
51 # reduce inbound network traffic
53 # what should the interface between StorageFarmBroker and IntroducerClient
55 # don't pass signatures: only pass validated blessed-objects
57 class StorageFarmBroker:
58 implements(IStorageBroker)
59 """I live on the client, and know about storage servers. For each server
60 that is participating in a grid, I either maintain a connection to it or
61 remember enough information to establish a connection to it on demand.
62 I'm also responsible for subscribing to the IntroducerClient to find out
63 about new servers as they are announced by the Introducer.
65 def __init__(self, tub, permute_peers):
67 assert permute_peers # False not implemented yet
68 self.permute_peers = permute_peers
69 # self.servers maps serverid -> IServer, and keeps track of all the
70 # storage servers that we've heard about. Each descriptor manages its
71 # own Reconnector, and will give us a RemoteReference when we ask
74 self.introducer_client = None
76 # these two are used in unit tests
77 def test_add_rref(self, serverid, rref, ann):
78 s = NativeStorageServer(serverid, ann.copy())
80 self.servers[serverid] = s
82 def test_add_server(self, serverid, s):
83 self.servers[serverid] = s
85 def use_introducer(self, introducer_client):
86 self.introducer_client = ic = introducer_client
87 ic.subscribe_to("storage", self._got_announcement)
89 def _got_announcement(self, key_s, ann):
91 precondition(isinstance(key_s, str), key_s)
92 precondition(key_s.startswith("v0-"), key_s)
93 assert ann["service-name"] == "storage"
94 s = NativeStorageServer(key_s, ann)
95 serverid = s.get_serverid()
96 old = self.servers.get(serverid)
98 if old.get_announcement() == ann:
101 del self.servers[serverid]
102 old.stop_connecting()
103 # now we forget about them and start using the new one
104 self.servers[serverid] = s
105 s.start_connecting(self.tub, self._trigger_connections)
106 # the descriptor will manage their own Reconnector, and each time we
107 # need servers, we'll ask them if they're connected or not.
109 def _trigger_connections(self):
110 # when one connection is established, reset the timers on all others,
111 # to trigger a reconnection attempt in one second. This is intended
112 # to accelerate server connections when we've been offline for a
113 # while. The goal is to avoid hanging out for a long time with
114 # connections to only a subset of the servers, which would increase
115 # the chances that we'll put shares in weird places (and not update
116 # existing shares of mutable files). See #374 for more details.
117 for dsc in self.servers.values():
120 def get_servers_for_psi(self, peer_selection_index):
121 # return a list of server objects (IServers)
122 assert self.permute_peers == True
123 def _permuted(server):
124 seed = server.get_permutation_seed()
125 return sha1(peer_selection_index + seed).digest()
126 return sorted(self.get_connected_servers(), key=_permuted)
128 def get_all_serverids(self):
129 return frozenset(self.servers.keys())
131 def get_connected_servers(self):
132 return frozenset([s for s in self.servers.values() if s.get_rref()])
134 def get_known_servers(self):
135 return frozenset(self.servers.values())
137 def get_nickname_for_serverid(self, serverid):
138 if serverid in self.servers:
139 return self.servers[serverid].get_nickname()
142 def get_stub_server(self, serverid):
143 if serverid in self.servers:
144 return self.servers[serverid]
145 return StubServer(serverid)
148 implements(IDisplayableServer)
149 def __init__(self, serverid):
150 self.serverid = serverid # binary tubid
151 def get_serverid(self):
154 return base32.b2a(self.serverid)[:8]
155 def get_longname(self):
156 return base32.b2a(self.serverid)
157 def get_nickname(self):
160 class NativeStorageServer:
161 """I hold information about a storage server that we want to connect to.
162 If we are connected, I hold the RemoteReference, their host address, and
163 the their version information. I remember information about when we were
164 last connected too, even if we aren't currently connected.
166 @ivar announcement_time: when we first heard about this service
167 @ivar last_connect_time: when we last established a connection
168 @ivar last_loss_time: when we last lost a connection
170 @ivar version: the server's versiondict, from the most recent announcement
171 @ivar nickname: the server's self-reported nickname (unicode), same
173 @ivar rref: the RemoteReference, if connected, otherwise None
174 @ivar remote_host: the IAddress, if connected, otherwise None
179 "http://allmydata.org/tahoe/protocols/storage/v1" :
180 { "maximum-immutable-share-size": 2**32,
181 "tolerates-immutable-read-overrun": False,
182 "delete-mutable-shares-with-zero-length-writev": False,
184 "application-version": "unknown: no get_version()",
187 def __init__(self, key_s, ann, min_shares=1):
189 self.announcement = ann
190 self.min_shares = min_shares
192 assert "anonymous-storage-FURL" in ann, ann
193 furl = str(ann["anonymous-storage-FURL"])
194 m = re.match(r'pb://(\w+)@', furl)
196 tubid_s = m.group(1).lower()
197 self._tubid = base32.a2b(tubid_s)
198 assert "permutation-seed-base32" in ann, ann
199 ps = base32.a2b(str(ann["permutation-seed-base32"]))
200 self._permutation_seed = ps
203 self._long_description = key_s
204 if key_s.startswith("v0-"):
205 # remove v0- prefix from abbreviated name
206 self._short_description = key_s[3:3+8]
208 self._short_description = key_s[:8]
210 self._long_description = tubid_s
211 self._short_description = tubid_s[:6]
213 self.announcement_time = time.time()
214 self.last_connect_time = None
215 self.last_loss_time = None
216 self.remote_host = None
218 self._reconnector = None
219 self._trigger_cb = None
221 # Special methods used by copy.copy() and copy.deepcopy(). When those are
222 # used in allmydata.immutable.filenode to copy CheckResults during
223 # repair, we want it to treat the IServer instances as singletons, and
224 # not attempt to duplicate them..
227 def __deepcopy__(self, memodict):
231 return "<NativeStorageServer for %s>" % self.get_name()
232 def get_serverid(self):
233 return self._tubid # XXX replace with self.key_s
234 def get_permutation_seed(self):
235 return self._permutation_seed
236 def get_version(self):
238 return self.rref.version
240 def get_name(self): # keep methodname short
241 # TODO: decide who adds [] in the short description. It should
242 # probably be the output side, not here.
243 return self._short_description
244 def get_longname(self):
245 return self._long_description
246 def get_lease_seed(self):
248 def get_foolscap_write_enabler_seed(self):
251 def get_nickname(self):
252 return self.announcement["nickname"].decode("utf-8")
253 def get_announcement(self):
254 return self.announcement
255 def get_remote_host(self):
256 return self.remote_host
257 def get_last_connect_time(self):
258 return self.last_connect_time
259 def get_last_loss_time(self):
260 return self.last_loss_time
261 def get_announcement_time(self):
262 return self.announcement_time
264 def start_connecting(self, tub, trigger_cb):
265 furl = str(self.announcement["anonymous-storage-FURL"])
266 self._trigger_cb = trigger_cb
267 self._reconnector = tub.connectTo(furl, self._got_connection)
269 def _got_connection(self, rref):
270 lp = log.msg(format="got connection to %(name)s, getting versions",
271 name=self.get_name(),
272 facility="tahoe.storage_broker", umid="coUECQ")
274 eventually(self._trigger_cb)
275 default = self.VERSION_DEFAULTS
276 d = add_version_to_remote_reference(rref, default)
277 d.addCallback(self._got_versioned_service, lp)
278 d.addErrback(log.err, format="storageclient._got_connection",
279 name=self.get_name(), umid="Sdq3pg")
281 def _got_versioned_service(self, rref, lp):
282 log.msg(format="%(name)s provided version info %(version)s",
283 name=self.get_name(), version=rref.version,
284 facility="tahoe.storage_broker", umid="SWmJYg",
285 level=log.NOISY, parent=lp)
287 self.last_connect_time = time.time()
288 self.remote_host = rref.getPeer()
290 rref.notifyOnDisconnect(self._lost)
296 log.msg(format="lost connection to %(name)s", name=self.get_name(),
297 facility="tahoe.storage_broker", umid="zbRllw")
298 self.last_loss_time = time.time()
300 self.remote_host = None
302 def stop_connecting(self):
303 # used when this descriptor has been superceded by another
304 self._reconnector.stopConnecting()
306 def try_to_connect(self):
307 # used when the broker wants us to hurry up
308 self._reconnector.reset()
310 class UnknownServerTypeError(Exception):