3 I contain the client-side code which speaks to storage servers, in particular
4 the foolscap-based server implemented in src/allmydata/storage/*.py .
9 # 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
10 # create it, change uploader/servermap to get rrefs from it. ServerFarm calls
11 # IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
12 # to clients. webapi status pages call broker.get_info_about_serverid.
14 # 2: move get_info methods to the descriptor, webapi status pages call
15 # broker.get_descriptor_for_serverid().get_info
17 # 3?later?: store descriptors in UploadResults/etc instead of serverids,
18 # webapi status pages call descriptor.get_info and don't use storage_broker
21 # 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
22 # optional. This closes #467
24 # 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
25 # clients. Clients stop doing callRemote(), use NativeStorageClient methods
26 # instead (which might do something else, i.e. http or whatever). The
27 # introducer and tahoe.cfg only create NativeStorageClients for now.
29 # 6: implement other sorts of IStorageClient classes: S3, etc
32 from hashlib import sha1
34 # hashlib was added in Python 2.5
40 from zope.interface import implements, Interface
41 from foolscap.api import eventually
42 from allmydata.interfaces import IStorageBroker
43 from allmydata.util import idlib, log
44 from allmydata.util.assertutil import _assert, precondition
45 from allmydata.util.rrefutil import add_version_to_remote_reference
47 # who is responsible for de-duplication?
49 # IC remembers the unpacked announcements it receives, to provide for late
50 # subscribers and to remove duplicates
52 # if a client subscribes after startup, will they receive old announcements?
55 # who will be responsible for signature checking?
56 # make it be IntroducerClient, so they can push the filter outwards and
57 # reduce inbound network traffic
59 # what should the interface between StorageFarmBroker and IntroducerClient
61 # don't pass signatures: only pass validated blessed-objects
63 class StorageFarmBroker:
64 implements(IStorageBroker)
65 """I live on the client, and know about storage servers. For each server
66 that is participating in a grid, I either maintain a connection to it or
67 remember enough information to establish a connection to it on demand.
68 I'm also responsible for subscribing to the IntroducerClient to find out
69 about new servers as they are announced by the Introducer.
71 def __init__(self, tub, permute_peers):
73 assert permute_peers # False not implemented yet
74 self.permute_peers = permute_peers
75 # self.descriptors maps serverid -> IServerDescriptor, and keeps
76 # track of all the storage servers that we've heard about. Each
77 # descriptor manages its own Reconnector, and will give us a
78 # RemoteReference when we ask them for it.
80 # self.test_servers are statically configured from unit tests
81 self.test_servers = {} # serverid -> rref
82 self.introducer_client = None
84 # these two are used in unit tests
85 def test_add_server(self, serverid, rref):
86 self.test_servers[serverid] = rref
87 def test_add_descriptor(self, serverid, dsc):
88 self.descriptors[serverid] = dsc
90 def use_introducer(self, introducer_client):
91 self.introducer_client = ic = introducer_client
92 ic.subscribe_to("storage", self._got_announcement)
94 def _got_announcement(self, serverid, ann_d):
95 precondition(isinstance(serverid, str), serverid)
96 precondition(len(serverid) == 20, serverid)
97 assert ann_d["service-name"] == "storage"
98 old = self.descriptors.get(serverid)
100 if old.get_announcement() == ann_d:
103 del self.descriptors[serverid]
104 old.stop_connecting()
105 # now we forget about them and start using the new one
106 dsc = NativeStorageClientDescriptor(serverid, ann_d)
107 self.descriptors[serverid] = dsc
108 dsc.start_connecting(self.tub, self._trigger_connections)
109 # the descriptor will manage their own Reconnector, and each time we
110 # need servers, we'll ask them if they're connected or not.
112 def _trigger_connections(self):
113 # when one connection is established, reset the timers on all others,
114 # to trigger a reconnection attempt in one second. This is intended
115 # to accelerate server connections when we've been offline for a
116 # while. The goal is to avoid hanging out for a long time with
117 # connections to only a subset of the servers, which would increase
118 # the chances that we'll put shares in weird places (and not update
119 # existing shares of mutable files). See #374 for more details.
120 for dsc in self.descriptors.values():
125 def get_servers_for_index(self, peer_selection_index):
126 # first cut: return a list of (peerid, versioned-rref) tuples
127 assert self.permute_peers == True
128 servers = self.get_all_servers()
129 key = peer_selection_index
130 return sorted(servers, key=lambda x: sha1(key+x[0]).digest())
132 def get_all_servers(self):
133 # return a frozenset of (peerid, versioned-rref) tuples
135 for serverid,rref in self.test_servers.items():
136 servers[serverid] = rref
137 for serverid,dsc in self.descriptors.items():
138 rref = dsc.get_rref()
140 servers[serverid] = rref
141 result = frozenset(servers.items())
142 _assert(len(result) <= len(self.get_all_serverids()), result, self.get_all_serverids())
145 def get_all_serverids(self):
147 serverids.update(self.test_servers.keys())
148 serverids.update(self.descriptors.keys())
149 return frozenset(serverids)
151 def get_all_descriptors(self):
152 return sorted(self.descriptors.values(),
153 key=lambda dsc: dsc.get_serverid())
155 def get_nickname_for_serverid(self, serverid):
156 if serverid in self.descriptors:
157 return self.descriptors[serverid].get_nickname()
161 class IServerDescriptor(Interface):
162 def start_connecting(tub, trigger_cb):
169 class NativeStorageClientDescriptor:
170 """I hold information about a storage server that we want to connect to.
171 If we are connected, I hold the RemoteReference, their host address, and
172 the their version information. I remember information about when we were
173 last connected too, even if we aren't currently connected.
175 @ivar announcement_time: when we first heard about this service
176 @ivar last_connect_time: when we last established a connection
177 @ivar last_loss_time: when we last lost a connection
179 @ivar version: the server's versiondict, from the most recent announcement
180 @ivar nickname: the server's self-reported nickname (unicode), same
182 @ivar rref: the RemoteReference, if connected, otherwise None
183 @ivar remote_host: the IAddress, if connected, otherwise None
185 implements(IServerDescriptor)
188 "http://allmydata.org/tahoe/protocols/storage/v1" :
189 { "maximum-immutable-share-size": 2**32,
190 "tolerates-immutable-read-overrun": False,
191 "delete-mutable-shares-with-zero-length-writev": False,
193 "application-version": "unknown: no get_version()",
196 def __init__(self, serverid, ann_d, min_shares=1):
197 self.serverid = serverid
198 self.announcement = ann_d
199 self.min_shares = min_shares
201 self.serverid_s = idlib.shortnodeid_b2a(self.serverid)
202 self.announcement_time = time.time()
203 self.last_connect_time = None
204 self.last_loss_time = None
205 self.remote_host = None
207 self._reconnector = None
208 self._trigger_cb = None
210 def get_serverid(self):
213 def get_nickname(self):
214 return self.announcement["nickname"].decode("utf-8")
215 def get_announcement(self):
216 return self.announcement
217 def get_remote_host(self):
218 return self.remote_host
219 def get_last_connect_time(self):
220 return self.last_connect_time
221 def get_last_loss_time(self):
222 return self.last_loss_time
223 def get_announcement_time(self):
224 return self.announcement_time
226 def start_connecting(self, tub, trigger_cb):
227 furl = self.announcement["FURL"]
228 self._trigger_cb = trigger_cb
229 self._reconnector = tub.connectTo(furl, self._got_connection)
231 def _got_connection(self, rref):
232 lp = log.msg(format="got connection to %(serverid)s, getting versions",
233 serverid=self.serverid_s,
234 facility="tahoe.storage_broker", umid="coUECQ")
236 eventually(self._trigger_cb)
237 default = self.VERSION_DEFAULTS
238 d = add_version_to_remote_reference(rref, default)
239 d.addCallback(self._got_versioned_service, lp)
240 d.addErrback(log.err, format="storageclient._got_connection",
241 serverid=self.serverid_s, umid="Sdq3pg")
243 def _got_versioned_service(self, rref, lp):
244 log.msg(format="%(serverid)s provided version info %(version)s",
245 serverid=self.serverid_s, version=rref.version,
246 facility="tahoe.storage_broker", umid="SWmJYg",
247 level=log.NOISY, parent=lp)
249 self.last_connect_time = time.time()
250 self.remote_host = rref.getPeer()
252 rref.notifyOnDisconnect(self._lost)
258 log.msg(format="lost connection to %(serverid)s",
259 serverid=self.serverid_s,
260 facility="tahoe.storage_broker", umid="zbRllw")
261 self.last_loss_time = time.time()
263 self.remote_host = None
265 def stop_connecting(self):
266 # used when this descriptor has been superceded by another
267 self._reconnector.stopConnecting()
269 def try_to_connect(self):
270 # used when the broker wants us to hurry up
271 self._reconnector.reset()
273 class UnknownServerTypeError(Exception):