3 I contain the client-side code which speaks to storage servers, in particular
4 the foolscap-based server implemented in src/allmydata/storage/*.py .
9 # 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
10 # create it, change uploader/servermap to get rrefs from it. ServerFarm calls
11 # IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
12 # to clients. webapi status pages call broker.get_info_about_serverid.
14 # 2: move get_info methods to the descriptor, webapi status pages call
15 # broker.get_descriptor_for_serverid().get_info
17 # 3?later?: store descriptors in UploadResults/etc instead of serverids,
18 # webapi status pages call descriptor.get_info and don't use storage_broker
21 # 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
22 # optional. This closes #467
24 # 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
25 # clients. Clients stop doing callRemote(), use NativeStorageClient methods
26 # instead (which might do something else, i.e. http or whatever). The
27 # introducer and tahoe.cfg only create NativeStorageClients for now.
29 # 6: implement other sorts of IStorageClient classes: S3, etc
33 from zope.interface import implements, Interface
34 from foolscap.api import eventually
35 from allmydata.interfaces import IStorageBroker
36 from allmydata.util import idlib, log
37 from allmydata.util.assertutil import _assert, precondition
38 from allmydata.util.rrefutil import add_version_to_remote_reference
39 from allmydata.util.hashutil import sha1
41 # who is responsible for de-duplication?
43 # IC remembers the unpacked announcements it receives, to provide for late
44 # subscribers and to remove duplicates
46 # if a client subscribes after startup, will they receive old announcements?
49 # who will be responsible for signature checking?
50 # make it be IntroducerClient, so they can push the filter outwards and
51 # reduce inbound network traffic
53 # what should the interface between StorageFarmBroker and IntroducerClient
55 # don't pass signatures: only pass validated blessed-objects
57 class StorageFarmBroker:
58 implements(IStorageBroker)
59 """I live on the client, and know about storage servers. For each server
60 that is participating in a grid, I either maintain a connection to it or
61 remember enough information to establish a connection to it on demand.
62 I'm also responsible for subscribing to the IntroducerClient to find out
63 about new servers as they are announced by the Introducer.
65 def __init__(self, tub, permute_peers):
67 assert permute_peers # False not implemented yet
68 self.permute_peers = permute_peers
69 # self.descriptors maps serverid -> IServerDescriptor, and keeps
70 # track of all the storage servers that we've heard about. Each
71 # descriptor manages its own Reconnector, and will give us a
72 # RemoteReference when we ask them for it.
74 # self.test_servers are statically configured from unit tests
75 self.test_servers = {} # serverid -> rref
76 self.introducer_client = None
78 # these two are used in unit tests
79 def test_add_server(self, serverid, rref):
80 self.test_servers[serverid] = rref
81 def test_add_descriptor(self, serverid, dsc):
82 self.descriptors[serverid] = dsc
84 def use_introducer(self, introducer_client):
85 self.introducer_client = ic = introducer_client
86 ic.subscribe_to("storage", self._got_announcement)
88 def _got_announcement(self, serverid, ann_d):
89 precondition(isinstance(serverid, str), serverid)
90 precondition(len(serverid) == 20, serverid)
91 assert ann_d["service-name"] == "storage"
92 old = self.descriptors.get(serverid)
94 if old.get_announcement() == ann_d:
97 del self.descriptors[serverid]
99 # now we forget about them and start using the new one
100 dsc = NativeStorageClientDescriptor(serverid, ann_d)
101 self.descriptors[serverid] = dsc
102 dsc.start_connecting(self.tub, self._trigger_connections)
103 # the descriptor will manage their own Reconnector, and each time we
104 # need servers, we'll ask them if they're connected or not.
106 def _trigger_connections(self):
107 # when one connection is established, reset the timers on all others,
108 # to trigger a reconnection attempt in one second. This is intended
109 # to accelerate server connections when we've been offline for a
110 # while. The goal is to avoid hanging out for a long time with
111 # connections to only a subset of the servers, which would increase
112 # the chances that we'll put shares in weird places (and not update
113 # existing shares of mutable files). See #374 for more details.
114 for dsc in self.descriptors.values():
119 def get_servers_for_index(self, peer_selection_index):
120 # first cut: return a list of (peerid, versioned-rref) tuples
121 assert self.permute_peers == True
122 servers = self.get_all_servers()
123 key = peer_selection_index
124 return sorted(servers, key=lambda x: sha1(key+x[0]).digest())
126 def get_all_servers(self):
127 # return a frozenset of (peerid, versioned-rref) tuples
129 for serverid,rref in self.test_servers.items():
130 servers[serverid] = rref
131 for serverid,dsc in self.descriptors.items():
132 rref = dsc.get_rref()
134 servers[serverid] = rref
135 result = frozenset(servers.items())
136 _assert(len(result) <= len(self.get_all_serverids()), result, self.get_all_serverids())
139 def get_all_serverids(self):
141 serverids.update(self.test_servers.keys())
142 serverids.update(self.descriptors.keys())
143 return frozenset(serverids)
145 def get_all_descriptors(self):
146 return sorted(self.descriptors.values(),
147 key=lambda dsc: dsc.get_serverid())
149 def get_nickname_for_serverid(self, serverid):
150 if serverid in self.descriptors:
151 return self.descriptors[serverid].get_nickname()
155 class IServerDescriptor(Interface):
156 def start_connecting(tub, trigger_cb):
163 class NativeStorageClientDescriptor:
164 """I hold information about a storage server that we want to connect to.
165 If we are connected, I hold the RemoteReference, their host address, and
166 the their version information. I remember information about when we were
167 last connected too, even if we aren't currently connected.
169 @ivar announcement_time: when we first heard about this service
170 @ivar last_connect_time: when we last established a connection
171 @ivar last_loss_time: when we last lost a connection
173 @ivar version: the server's versiondict, from the most recent announcement
174 @ivar nickname: the server's self-reported nickname (unicode), same
176 @ivar rref: the RemoteReference, if connected, otherwise None
177 @ivar remote_host: the IAddress, if connected, otherwise None
179 implements(IServerDescriptor)
182 "http://allmydata.org/tahoe/protocols/storage/v1" :
183 { "maximum-immutable-share-size": 2**32,
184 "tolerates-immutable-read-overrun": False,
185 "delete-mutable-shares-with-zero-length-writev": False,
187 "application-version": "unknown: no get_version()",
190 def __init__(self, serverid, ann_d, min_shares=1):
191 self.serverid = serverid
192 self.announcement = ann_d
193 self.min_shares = min_shares
195 self.serverid_s = idlib.shortnodeid_b2a(self.serverid)
196 self.announcement_time = time.time()
197 self.last_connect_time = None
198 self.last_loss_time = None
199 self.remote_host = None
201 self._reconnector = None
202 self._trigger_cb = None
204 def get_serverid(self):
207 def get_nickname(self):
208 return self.announcement["nickname"].decode("utf-8")
209 def get_announcement(self):
210 return self.announcement
211 def get_remote_host(self):
212 return self.remote_host
213 def get_last_connect_time(self):
214 return self.last_connect_time
215 def get_last_loss_time(self):
216 return self.last_loss_time
217 def get_announcement_time(self):
218 return self.announcement_time
220 def start_connecting(self, tub, trigger_cb):
221 furl = self.announcement["FURL"]
222 self._trigger_cb = trigger_cb
223 self._reconnector = tub.connectTo(furl, self._got_connection)
225 def _got_connection(self, rref):
226 lp = log.msg(format="got connection to %(serverid)s, getting versions",
227 serverid=self.serverid_s,
228 facility="tahoe.storage_broker", umid="coUECQ")
230 eventually(self._trigger_cb)
231 default = self.VERSION_DEFAULTS
232 d = add_version_to_remote_reference(rref, default)
233 d.addCallback(self._got_versioned_service, lp)
234 d.addErrback(log.err, format="storageclient._got_connection",
235 serverid=self.serverid_s, umid="Sdq3pg")
237 def _got_versioned_service(self, rref, lp):
238 log.msg(format="%(serverid)s provided version info %(version)s",
239 serverid=self.serverid_s, version=rref.version,
240 facility="tahoe.storage_broker", umid="SWmJYg",
241 level=log.NOISY, parent=lp)
243 self.last_connect_time = time.time()
244 self.remote_host = rref.getPeer()
246 rref.notifyOnDisconnect(self._lost)
252 log.msg(format="lost connection to %(serverid)s",
253 serverid=self.serverid_s,
254 facility="tahoe.storage_broker", umid="zbRllw")
255 self.last_loss_time = time.time()
257 self.remote_host = None
259 def stop_connecting(self):
260 # used when this descriptor has been superceded by another
261 self._reconnector.stopConnecting()
263 def try_to_connect(self):
264 # used when the broker wants us to hurry up
265 self._reconnector.reset()
267 class UnknownServerTypeError(Exception):