3 I contain the client-side code which speaks to storage servers, in particular
4 the foolscap-based server implemented in src/allmydata/storage/*.py .
9 # 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
10 # create it, change uploader/servermap to get rrefs from it. ServerFarm calls
11 # IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
12 # to clients. webapi status pages call broker.get_info_about_serverid.
14 # 2: move get_info methods to the descriptor, webapi status pages call
15 # broker.get_descriptor_for_serverid().get_info
17 # 3?later?: store descriptors in UploadResults/etc instead of serverids,
18 # webapi status pages call descriptor.get_info and don't use storage_broker
21 # 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
22 # optional. This closes #467
24 # 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
25 # clients. Clients stop doing callRemote(), use NativeStorageClient methods
26 # instead (which might do something else, i.e. http or whatever). The
27 # introducer and tahoe.cfg only create NativeStorageClients for now.
29 # 6: implement other sorts of IStorageClient classes: S3, etc
32 from zope.interface import implements, Interface
33 from foolscap.api import eventually
34 from allmydata.interfaces import IStorageBroker
35 from allmydata.util import idlib, log
36 from allmydata.util.rrefutil import add_version_to_remote_reference
38 # who is responsible for de-duplication?
40 # IC remembers the unpacked announcements it receives, to provide for late
41 # subscribers and to remove duplicates
43 # if a client subscribes after startup, will they receive old announcements?
46 # who will be responsible for signature checking?
47 # make it be IntroducerClient, so they can push the filter outwards and
48 # reduce inbound network traffic
50 # what should the interface between StorageFarmBroker and IntroducerClient
52 # don't pass signatures: only pass validated blessed-objects
54 class StorageFarmBroker:
55 implements(IStorageBroker)
56 """I live on the client, and know about storage servers. For each server
57 that is participating in a grid, I either maintain a connection to it or
58 remember enough information to establish a connection to it on demand.
59 I'm also responsible for subscribing to the IntroducerClient to find out
60 about new servers as they are announced by the Introducer.
62 def __init__(self, tub, permute_peers):
64 assert permute_peers # False not implemented yet
65 self.permute_peers = permute_peers
66 # self.descriptors maps serverid -> IServerDescriptor, and keeps
67 # track of all the storage servers that we've heard about. Each
68 # descriptor manages its own Reconnector, and will give us a
69 # RemoteReference when we ask them for it.
71 # self.servers are statically configured from unit tests
72 self.test_servers = {} # serverid -> rref
73 self.introducer_client = None
75 # these two are used in unit tests
76 def test_add_server(self, serverid, rref):
77 self.test_servers[serverid] = rref
78 def test_add_descriptor(self, serverid, dsc):
79 self.descriptors[serverid] = dsc
81 def use_introducer(self, introducer_client):
82 self.introducer_client = ic = introducer_client
83 ic.subscribe_to("storage", self._got_announcement)
85 def _got_announcement(self, serverid, ann_d):
86 assert ann_d["service-name"] == "storage"
87 old = self.descriptors.get(serverid)
89 if old.get_announcement() == ann_d:
92 del self.descriptors[serverid]
94 # now we forget about them and start using the new one
95 dsc = NativeStorageClientDescriptor(serverid, ann_d)
96 self.descriptors[serverid] = dsc
97 dsc.start_connecting(self.tub, self._trigger_connections)
98 # the descriptor will manage their own Reconnector, and each time we
99 # need servers, we'll ask them if they're connected or not.
101 def _trigger_connections(self):
102 # when one connection is established, reset the timers on all others,
103 # to trigger a reconnection attempt in one second. This is intended
104 # to accelerate server connections when we've been offline for a
105 # while. The goal is to avoid hanging out for a long time with
106 # connections to only a subset of the servers, which would increase
107 # the chances that we'll put shares in weird places (and not update
108 # existing shares of mutable files). See #374 for more details.
109 for dsc in self.descriptors.values():
114 def get_servers_for_index(self, peer_selection_index):
115 # first cut: return a list of (peerid, versioned-rref) tuples
116 assert self.permute_peers == True
117 servers = self.get_all_servers()
118 key = peer_selection_index
119 return sorted(servers, key=lambda x: sha.new(key+x[0]).digest())
121 def get_all_servers(self):
122 # return a frozenset of (peerid, versioned-rref) tuples
124 for serverid,rref in self.test_servers.items():
125 servers[serverid] = rref
126 for serverid,dsc in self.descriptors.items():
127 rref = dsc.get_rref()
129 servers[serverid] = rref
130 return frozenset(servers.items())
132 def get_all_serverids(self):
134 serverids.update(self.test_servers.keys())
135 serverids.update(self.descriptors.keys())
136 return frozenset(serverids)
138 def get_all_descriptors(self):
139 return sorted(self.descriptors.values(),
140 key=lambda dsc: dsc.get_serverid())
142 def get_nickname_for_serverid(self, serverid):
143 if serverid in self.descriptors:
144 return self.descriptors[serverid].get_nickname()
148 class IServerDescriptor(Interface):
149 def start_connecting(tub, trigger_cb):
156 class NativeStorageClientDescriptor:
157 """I hold information about a storage server that we want to connect to.
158 If we are connected, I hold the RemoteReference, their host address, and
159 the their version information. I remember information about when we were
160 last connected too, even if we aren't currently connected.
162 @ivar announcement_time: when we first heard about this service
163 @ivar last_connect_time: when we last established a connection
164 @ivar last_loss_time: when we last lost a connection
166 @ivar version: the server's versiondict, from the most recent announcement
167 @ivar nickname: the server's self-reported nickname (unicode), same
169 @ivar rref: the RemoteReference, if connected, otherwise None
170 @ivar remote_host: the IAddress, if connected, otherwise None
172 implements(IServerDescriptor)
175 "http://allmydata.org/tahoe/protocols/storage/v1" :
176 { "maximum-immutable-share-size": 2**32,
177 "tolerates-immutable-read-overrun": False,
178 "delete-mutable-shares-with-zero-length-writev": False,
180 "application-version": "unknown: no get_version()",
183 def __init__(self, serverid, ann_d, min_shares=1):
184 self.serverid = serverid
185 self.announcement = ann_d
186 self.min_shares = min_shares
188 self.serverid_s = idlib.shortnodeid_b2a(self.serverid)
189 self.announcement_time = time.time()
190 self.last_connect_time = None
191 self.last_loss_time = None
192 self.remote_host = None
194 self._reconnector = None
195 self._trigger_cb = None
197 def get_serverid(self):
200 def get_nickname(self):
201 return self.announcement["nickname"].decode("utf-8")
202 def get_announcement(self):
203 return self.announcement
204 def get_remote_host(self):
205 return self.remote_host
206 def get_last_connect_time(self):
207 return self.last_connect_time
208 def get_last_loss_time(self):
209 return self.last_loss_time
210 def get_announcement_time(self):
211 return self.announcement_time
213 def start_connecting(self, tub, trigger_cb):
214 furl = self.announcement["FURL"]
215 self._trigger_cb = trigger_cb
216 self._reconnector = tub.connectTo(furl, self._got_connection)
218 def _got_connection(self, rref):
219 lp = log.msg(format="got connection to %(serverid)s, getting versions",
220 serverid=self.serverid_s,
221 facility="tahoe.storage_broker", umid="coUECQ")
223 eventually(self._trigger_cb)
224 default = self.VERSION_DEFAULTS
225 d = add_version_to_remote_reference(rref, default)
226 d.addCallback(self._got_versioned_service, lp)
227 d.addErrback(log.err, format="storageclient._got_connection",
228 serverid=self.serverid_s, umid="Sdq3pg")
230 def _got_versioned_service(self, rref, lp):
231 log.msg(format="%(serverid)s provided version info %(version)s",
232 serverid=self.serverid_s, version=rref.version,
233 facility="tahoe.storage_broker", umid="SWmJYg",
234 level=log.NOISY, parent=lp)
236 self.last_connect_time = time.time()
237 self.remote_host = rref.getPeer()
239 rref.notifyOnDisconnect(self._lost)
245 log.msg(format="lost connection to %(serverid)s",
246 serverid=self.serverid_s,
247 facility="tahoe.storage_broker", umid="zbRllw")
248 self.last_loss_time = time.time()
250 self.remote_host = None
252 def stop_connecting(self):
253 # used when this descriptor has been superceded by another
254 self._reconnector.stopConnecting()
256 def try_to_connect(self):
257 # used when the broker wants us to hurry up
258 self._reconnector.reset()
260 class UnknownServerTypeError(Exception):