3 I contain the client-side code which speaks to storage servers, in particular
4 the foolscap-based server implemented in src/allmydata/storage/*.py .
9 # 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
10 # create it, change uploader/servermap to get rrefs from it. ServerFarm calls
11 # IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
12 # to clients. webapi status pages call broker.get_info_about_serverid.
14 # 2: move get_info methods to the descriptor, webapi status pages call
15 # broker.get_descriptor_for_serverid().get_info
17 # 3?later?: store descriptors in UploadResults/etc instead of serverids,
18 # webapi status pages call descriptor.get_info and don't use storage_broker
21 # 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
22 # optional. This closes #467
24 # 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
25 # clients. Clients stop doing callRemote(), use NativeStorageClient methods
26 # instead (which might do something else, i.e. http or whatever). The
27 # introducer and tahoe.cfg only create NativeStorageClients for now.
29 # 6: implement other sorts of IStorageClient classes: S3, etc
33 from zope.interface import implements, Interface
34 from foolscap.api import eventually
35 from allmydata.interfaces import IStorageBroker
36 from allmydata.util import idlib, log
37 from allmydata.util.assertutil import precondition
38 from allmydata.util.rrefutil import add_version_to_remote_reference
39 from allmydata.util.hashutil import sha1
41 # who is responsible for de-duplication?
43 # IC remembers the unpacked announcements it receives, to provide for late
44 # subscribers and to remove duplicates
46 # if a client subscribes after startup, will they receive old announcements?
49 # who will be responsible for signature checking?
50 # make it be IntroducerClient, so they can push the filter outwards and
51 # reduce inbound network traffic
53 # what should the interface between StorageFarmBroker and IntroducerClient
55 # don't pass signatures: only pass validated blessed-objects
57 class StorageFarmBroker:
58 implements(IStorageBroker)
59 """I live on the client, and know about storage servers. For each server
60 that is participating in a grid, I either maintain a connection to it or
61 remember enough information to establish a connection to it on demand.
62 I'm also responsible for subscribing to the IntroducerClient to find out
63 about new servers as they are announced by the Introducer.
65 def __init__(self, tub, permute_peers):
67 assert permute_peers # False not implemented yet
68 self.permute_peers = permute_peers
69 # self.servers maps serverid -> IServer, and keeps track of all the
70 # storage servers that we've heard about. Each descriptor manages its
71 # own Reconnector, and will give us a RemoteReference when we ask
74 # self.test_servers are statically configured from unit tests
75 self.test_servers = {} # serverid -> rref
76 self.introducer_client = None
78 # these two are used in unit tests
79 def test_add_server(self, serverid, rref):
80 self.test_servers[serverid] = rref
81 def test_add_descriptor(self, serverid, dsc):
82 self.servers[serverid] = dsc
84 def use_introducer(self, introducer_client):
85 self.introducer_client = ic = introducer_client
86 ic.subscribe_to("storage", self._got_announcement)
88 def _got_announcement(self, serverid, ann_d):
89 precondition(isinstance(serverid, str), serverid)
90 precondition(len(serverid) == 20, serverid)
91 assert ann_d["service-name"] == "storage"
92 old = self.servers.get(serverid)
94 if old.get_announcement() == ann_d:
97 del self.servers[serverid]
99 # now we forget about them and start using the new one
100 dsc = NativeStorageServer(serverid, ann_d)
101 self.servers[serverid] = dsc
102 dsc.start_connecting(self.tub, self._trigger_connections)
103 # the descriptor will manage their own Reconnector, and each time we
104 # need servers, we'll ask them if they're connected or not.
106 def _trigger_connections(self):
107 # when one connection is established, reset the timers on all others,
108 # to trigger a reconnection attempt in one second. This is intended
109 # to accelerate server connections when we've been offline for a
110 # while. The goal is to avoid hanging out for a long time with
111 # connections to only a subset of the servers, which would increase
112 # the chances that we'll put shares in weird places (and not update
113 # existing shares of mutable files). See #374 for more details.
114 for dsc in self.servers.values():
117 def get_servers_for_psi(self, peer_selection_index):
118 # return a list of server objects (IServers)
119 assert self.permute_peers == True
120 def _permuted(server):
121 seed = server.get_permutation_seed()
122 return sha1(peer_selection_index + seed).digest()
123 return sorted(self.get_connected_servers(), key=_permuted)
125 def get_all_serverids(self):
127 serverids.update(self.test_servers.keys())
128 serverids.update(self.servers.keys())
129 return frozenset(serverids)
131 def get_connected_servers(self):
132 return frozenset([s for s in self.get_known_servers()
135 def get_known_servers(self):
137 for serverid,rref in self.test_servers.items():
138 s = NativeStorageServer(serverid, {})
141 servers.extend(self.servers.values())
142 return sorted(servers, key=lambda s: s.get_serverid())
144 def get_nickname_for_serverid(self, serverid):
145 if serverid in self.servers:
146 return self.servers[serverid].get_nickname()
150 class IServer(Interface):
151 """I live in the client, and represent a single server."""
152 def start_connecting(tub, trigger_cb):
159 class NativeStorageServer:
160 """I hold information about a storage server that we want to connect to.
161 If we are connected, I hold the RemoteReference, their host address, and
162 the their version information. I remember information about when we were
163 last connected too, even if we aren't currently connected.
165 @ivar announcement_time: when we first heard about this service
166 @ivar last_connect_time: when we last established a connection
167 @ivar last_loss_time: when we last lost a connection
169 @ivar version: the server's versiondict, from the most recent announcement
170 @ivar nickname: the server's self-reported nickname (unicode), same
172 @ivar rref: the RemoteReference, if connected, otherwise None
173 @ivar remote_host: the IAddress, if connected, otherwise None
178 "http://allmydata.org/tahoe/protocols/storage/v1" :
179 { "maximum-immutable-share-size": 2**32,
180 "tolerates-immutable-read-overrun": False,
181 "delete-mutable-shares-with-zero-length-writev": False,
183 "application-version": "unknown: no get_version()",
186 def __init__(self, serverid, ann_d, min_shares=1):
187 self.serverid = serverid
188 self.announcement = ann_d
189 self.min_shares = min_shares
191 self.serverid_s = idlib.shortnodeid_b2a(self.serverid)
192 self.announcement_time = time.time()
193 self.last_connect_time = None
194 self.last_loss_time = None
195 self.remote_host = None
197 self._reconnector = None
198 self._trigger_cb = None
200 def get_serverid(self):
202 def get_permutation_seed(self):
205 def get_nickname(self):
206 return self.announcement["nickname"].decode("utf-8")
207 def get_announcement(self):
208 return self.announcement
209 def get_remote_host(self):
210 return self.remote_host
211 def get_last_connect_time(self):
212 return self.last_connect_time
213 def get_last_loss_time(self):
214 return self.last_loss_time
215 def get_announcement_time(self):
216 return self.announcement_time
218 def start_connecting(self, tub, trigger_cb):
219 furl = self.announcement["FURL"]
220 self._trigger_cb = trigger_cb
221 self._reconnector = tub.connectTo(furl, self._got_connection)
223 def _got_connection(self, rref):
224 lp = log.msg(format="got connection to %(serverid)s, getting versions",
225 serverid=self.serverid_s,
226 facility="tahoe.storage_broker", umid="coUECQ")
228 eventually(self._trigger_cb)
229 default = self.VERSION_DEFAULTS
230 d = add_version_to_remote_reference(rref, default)
231 d.addCallback(self._got_versioned_service, lp)
232 d.addErrback(log.err, format="storageclient._got_connection",
233 serverid=self.serverid_s, umid="Sdq3pg")
235 def _got_versioned_service(self, rref, lp):
236 log.msg(format="%(serverid)s provided version info %(version)s",
237 serverid=self.serverid_s, version=rref.version,
238 facility="tahoe.storage_broker", umid="SWmJYg",
239 level=log.NOISY, parent=lp)
241 self.last_connect_time = time.time()
242 self.remote_host = rref.getPeer()
244 rref.notifyOnDisconnect(self._lost)
250 log.msg(format="lost connection to %(serverid)s",
251 serverid=self.serverid_s,
252 facility="tahoe.storage_broker", umid="zbRllw")
253 self.last_loss_time = time.time()
255 self.remote_host = None
257 def stop_connecting(self):
258 # used when this descriptor has been superceded by another
259 self._reconnector.stopConnecting()
261 def try_to_connect(self):
262 # used when the broker wants us to hurry up
263 self._reconnector.reset()
265 class UnknownServerTypeError(Exception):