2 # This contains a test harness that creates a full Tahoe grid in a single
3 # process (actually in a single MultiService) which does not use the network.
4 # It does not use an Introducer, and there are no foolscap Tubs. Each storage
5 # server puts real shares on disk, but is accessed through loopback
6 # RemoteReferences instead of over serialized SSL. It is not as complete as
7 # the common.SystemTestMixin framework (which does use the network), but
8 # should be considerably faster: on my laptop, it takes 50-80ms to start up,
9 # whereas SystemTestMixin takes close to 2s.
11 # This should be useful for tests which want to examine and/or manipulate the
12 # uploaded shares, checker/verifier/repairer tests, etc. The clients have no
13 # Tubs, so it is not useful for tests that involve a Helper, a KeyGenerator,
14 # or the control.furl .
17 from zope.interface import implements
18 from twisted.application import service
19 from twisted.internet import defer, reactor
20 from twisted.python.failure import Failure
21 from foolscap.api import Referenceable, fireEventually, RemoteException
22 from base64 import b32encode
23 from allmydata import uri as tahoe_uri
24 from allmydata.client import Client
25 from allmydata.storage.server import StorageServer, storage_index_to_dir
26 from allmydata.util import fileutil, idlib, hashutil
27 from allmydata.util.hashutil import sha1
28 from allmydata.test.common_web import HTTPClientGETFactory
29 from allmydata.interfaces import IStorageBroker, IServer
30 from allmydata.test.common import TEST_RSA_KEY_SIZE
33 class IntentionalError(Exception):
40 def __init__(self, original):
41 self.original = original
43 self.hung_until = None
44 self.post_call_notifier = None
45 self.disconnectors = {}
47 def callRemoteOnly(self, methname, *args, **kwargs):
48 d = self.callRemote(methname, *args, **kwargs)
49 del d # explicitly ignored
52 def callRemote(self, methname, *args, **kwargs):
53 # this is ideally a Membrane, but that's too hard. We do a shallow
54 # wrapping of inbound arguments, and per-methodname wrapping of
55 # selected return values.
57 if isinstance(a, Referenceable):
58 return LocalWrapper(a)
61 args = tuple([wrap(a) for a in args])
62 kwargs = dict([(k,wrap(kwargs[k])) for k in kwargs])
65 meth = getattr(self.original, "remote_" + methname)
66 return meth(*args, **kwargs)
70 if self.broken is not True: # a counter, not boolean
72 raise IntentionalError("I was asked to break")
75 self.hung_until.addCallback(lambda ign: _really_call())
76 self.hung_until.addCallback(lambda res: d2.callback(res))
80 self.hung_until.addErrback(_err)
85 d.addCallback(lambda res: _call())
86 def _wrap_exception(f):
87 return Failure(RemoteException(f))
88 d.addErrback(_wrap_exception)
89 def _return_membrane(res):
90 # rather than complete the difficult task of building a
91 # fully-general Membrane (which would locate all Referenceable
92 # objects that cross the simulated wire and replace them with
93 # wrappers), we special-case certain methods that we happen to
94 # know will return Referenceables.
95 if methname == "allocate_buckets":
96 (alreadygot, allocated) = res
97 for shnum in allocated:
98 allocated[shnum] = LocalWrapper(allocated[shnum])
99 if methname == "get_buckets":
101 res[shnum] = LocalWrapper(res[shnum])
103 d.addCallback(_return_membrane)
104 if self.post_call_notifier:
105 d.addCallback(self.post_call_notifier, self, methname)
108 def notifyOnDisconnect(self, f, *args, **kwargs):
110 self.disconnectors[m] = (f, args, kwargs)
112 def dontNotifyOnDisconnect(self, marker):
113 del self.disconnectors[marker]
115 def wrap_storage_server(original):
116 # Much of the upload/download code uses rref.version (which normally
117 # comes from rrefutil.add_version_to_remote_reference). To avoid using a
118 # network, we want a LocalWrapper here. Try to satisfy all these
119 # constraints at the same time.
120 wrapper = LocalWrapper(original)
121 wrapper.version = original.remote_get_version()
124 class NoNetworkServer:
126 def __init__(self, serverid, rref):
127 self.serverid = serverid
130 return "<NoNetworkServer for %s>" % self.get_name()
131 # Special method used by copy.copy() and copy.deepcopy(). When those are
132 # used in allmydata.immutable.filenode to copy CheckResults during
133 # repair, we want it to treat the IServer instances as singletons.
136 def __deepcopy__(self, memodict):
138 def get_serverid(self):
140 def get_permutation_seed(self):
142 def get_lease_seed(self):
144 def get_foolscap_write_enabler_seed(self):
148 return idlib.shortnodeid_b2a(self.serverid)
149 def get_longname(self):
150 return idlib.nodeid_b2a(self.serverid)
151 def get_nickname(self):
155 def get_version(self):
156 return self.rref.version
158 class NoNetworkStorageBroker:
159 implements(IStorageBroker)
160 def get_servers_for_psi(self, peer_selection_index):
161 def _permuted(server):
162 seed = server.get_permutation_seed()
163 return sha1(peer_selection_index + seed).digest()
164 return sorted(self.get_connected_servers(), key=_permuted)
165 def get_connected_servers(self):
166 return self.client._servers
167 def get_nickname_for_serverid(self, serverid):
170 class NoNetworkClient(Client):
171 def create_tub(self):
173 def init_introducer_client(self):
175 def setup_logging(self):
177 def startService(self):
178 service.MultiService.startService(self)
179 def stopService(self):
180 service.MultiService.stopService(self)
181 def when_tub_ready(self):
182 raise NotImplementedError("NoNetworkClient has no Tub")
183 def init_control(self):
185 def init_helper(self):
187 def init_key_gen(self):
189 def init_storage(self):
191 def init_client_storage_broker(self):
192 self.storage_broker = NoNetworkStorageBroker()
193 self.storage_broker.client = self
194 def init_stub_client(self):
196 #._servers will be set by the NoNetworkGrid which creates us
201 self.stats_producers = []
203 def count(self, name, delta=1):
204 val = self.counters.setdefault(name, 0)
205 self.counters[name] = val + delta
207 def register_producer(self, stats_producer):
208 self.stats_producers.append(stats_producer)
212 for sp in self.stats_producers:
213 stats.update(sp.get_stats())
214 ret = { 'counters': self.counters, 'stats': stats }
217 class NoNetworkGrid(service.MultiService):
218 def __init__(self, basedir, num_clients=1, num_servers=10,
219 client_config_hooks={}):
220 service.MultiService.__init__(self)
221 self.basedir = basedir
222 fileutil.make_dirs(basedir)
224 self.servers_by_number = {} # maps to StorageServer instance
225 self.wrappers_by_id = {} # maps to wrapped StorageServer instance
226 self.proxies_by_id = {} # maps to IServer on which .rref is a wrapped
230 for i in range(num_servers):
231 ss = self.make_server(i)
232 self.add_server(i, ss)
233 self.rebuild_serverlist()
235 for i in range(num_clients):
236 clientid = hashutil.tagged_hash("clientid", str(i))[:20]
237 clientdir = os.path.join(basedir, "clients",
238 idlib.shortnodeid_b2a(clientid))
239 fileutil.make_dirs(clientdir)
240 f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
242 f.write("nickname = client-%d\n" % i)
243 f.write("web.port = tcp:0:interface=127.0.0.1\n")
244 f.write("[storage]\n")
245 f.write("enabled = false\n")
248 if i in client_config_hooks:
249 # this hook can either modify tahoe.cfg, or return an
250 # entirely new Client instance
251 c = client_config_hooks[i](clientdir)
253 c = NoNetworkClient(clientdir)
254 c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE)
256 c.short_nodeid = b32encode(clientid).lower()[:8]
257 c._servers = self.all_servers # can be updated later
258 c.setServiceParent(self)
259 self.clients.append(c)
261 def make_server(self, i, readonly=False):
262 serverid = hashutil.tagged_hash("serverid", str(i))[:20]
263 serverdir = os.path.join(self.basedir, "servers",
264 idlib.shortnodeid_b2a(serverid), "storage")
265 fileutil.make_dirs(serverdir)
266 ss = StorageServer(serverdir, serverid, stats_provider=SimpleStats(),
267 readonly_storage=readonly)
268 ss._no_network_server_number = i
271 def add_server(self, i, ss):
272 # to deal with the fact that all StorageServers are named 'storage',
273 # we interpose a middleman
274 middleman = service.MultiService()
275 middleman.setServiceParent(self)
276 ss.setServiceParent(middleman)
277 serverid = ss.my_nodeid
278 self.servers_by_number[i] = ss
279 wrapper = wrap_storage_server(ss)
280 self.wrappers_by_id[serverid] = wrapper
281 self.proxies_by_id[serverid] = NoNetworkServer(serverid, wrapper)
282 self.rebuild_serverlist()
284 def get_all_serverids(self):
285 return self.proxies_by_id.keys()
287 def rebuild_serverlist(self):
288 self.all_servers = frozenset(self.proxies_by_id.values())
289 for c in self.clients:
290 c._servers = self.all_servers
292 def remove_server(self, serverid):
293 # it's enough to remove the server from c._servers (we don't actually
294 # have to detach and stopService it)
295 for i,ss in self.servers_by_number.items():
296 if ss.my_nodeid == serverid:
297 del self.servers_by_number[i]
299 del self.wrappers_by_id[serverid]
300 del self.proxies_by_id[serverid]
301 self.rebuild_serverlist()
304 def break_server(self, serverid, count=True):
305 # mark the given server as broken, so it will throw exceptions when
306 # asked to hold a share or serve a share. If count= is a number,
307 # throw that many exceptions before starting to work again.
308 self.wrappers_by_id[serverid].broken = count
310 def hang_server(self, serverid):
311 # hang the given server
312 ss = self.wrappers_by_id[serverid]
313 assert ss.hung_until is None
314 ss.hung_until = defer.Deferred()
316 def unhang_server(self, serverid):
317 # unhang the given server
318 ss = self.wrappers_by_id[serverid]
319 assert ss.hung_until is not None
320 ss.hung_until.callback(None)
326 self.s = service.MultiService()
327 self.s.startService()
330 return self.s.stopService()
332 def set_up_grid(self, num_clients=1, num_servers=10,
333 client_config_hooks={}):
334 # self.basedir must be set
335 self.g = NoNetworkGrid(self.basedir,
336 num_clients=num_clients,
337 num_servers=num_servers,
338 client_config_hooks=client_config_hooks)
339 self.g.setServiceParent(self.s)
340 self.client_webports = [c.getServiceNamed("webish").getPortnum()
341 for c in self.g.clients]
342 self.client_baseurls = [c.getServiceNamed("webish").getURL()
343 for c in self.g.clients]
345 def get_clientdir(self, i=0):
346 return self.g.clients[i].basedir
348 def get_serverdir(self, i):
349 return self.g.servers_by_number[i].storedir
351 def iterate_servers(self):
352 for i in sorted(self.g.servers_by_number.keys()):
353 ss = self.g.servers_by_number[i]
354 yield (i, ss, ss.storedir)
356 def find_uri_shares(self, uri):
357 si = tahoe_uri.from_string(uri).get_storage_index()
358 prefixdir = storage_index_to_dir(si)
360 for i,ss in self.g.servers_by_number.items():
361 serverid = ss.my_nodeid
362 basedir = os.path.join(ss.sharedir, prefixdir)
363 if not os.path.exists(basedir):
365 for f in os.listdir(basedir):
368 shares.append((shnum, serverid, os.path.join(basedir, f)))
371 return sorted(shares)
373 def copy_shares(self, uri):
375 for (shnum, serverid, sharefile) in self.find_uri_shares(uri):
376 shares[sharefile] = open(sharefile, "rb").read()
379 def restore_all_shares(self, shares):
380 for sharefile, data in shares.items():
381 open(sharefile, "wb").write(data)
383 def delete_share(self, (shnum, serverid, sharefile)):
386 def delete_shares_numbered(self, uri, shnums):
387 for (i_shnum, i_serverid, i_sharefile) in self.find_uri_shares(uri):
388 if i_shnum in shnums:
389 os.unlink(i_sharefile)
391 def corrupt_share(self, (shnum, serverid, sharefile), corruptor_function):
392 sharedata = open(sharefile, "rb").read()
393 corruptdata = corruptor_function(sharedata)
394 open(sharefile, "wb").write(corruptdata)
396 def corrupt_shares_numbered(self, uri, shnums, corruptor, debug=False):
397 for (i_shnum, i_serverid, i_sharefile) in self.find_uri_shares(uri):
398 if i_shnum in shnums:
399 sharedata = open(i_sharefile, "rb").read()
400 corruptdata = corruptor(sharedata, debug=debug)
401 open(i_sharefile, "wb").write(corruptdata)
403 def corrupt_all_shares(self, uri, corruptor, debug=False):
404 for (i_shnum, i_serverid, i_sharefile) in self.find_uri_shares(uri):
405 sharedata = open(i_sharefile, "rb").read()
406 corruptdata = corruptor(sharedata, debug=debug)
407 open(i_sharefile, "wb").write(corruptdata)
409 def GET(self, urlpath, followRedirect=False, return_response=False,
410 method="GET", clientnum=0, **kwargs):
411 # if return_response=True, this fires with (data, statuscode,
412 # respheaders) instead of just data.
413 assert not isinstance(urlpath, unicode)
414 url = self.client_baseurls[clientnum] + urlpath
415 factory = HTTPClientGETFactory(url, method=method,
416 followRedirect=followRedirect, **kwargs)
417 reactor.connectTCP("localhost", self.client_webports[clientnum],factory)
420 return (data, factory.status, factory.response_headers)
422 d.addCallback(_got_data)
423 return factory.deferred
425 def PUT(self, urlpath, **kwargs):
426 return self.GET(urlpath, method="PUT", **kwargs)