2 # This contains a test harness that creates a full Tahoe grid in a single
3 # process (actually in a single MultiService) which does not use the network.
4 # It does not use an Introducer, and there are no foolscap Tubs. Each storage
5 # server puts real shares on disk, but is accessed through loopback
6 # RemoteReferences instead of over serialized SSL. It is not as complete as
7 # the common.SystemTestMixin framework (which does use the network), but
8 # should be considerably faster: on my laptop, it takes 50-80ms to start up,
9 # whereas SystemTestMixin takes close to 2s.
11 # This should be useful for tests which want to examine and/or manipulate the
12 # uploaded shares, checker/verifier/repairer tests, etc. The clients have no
13 # Tubs, so it is not useful for tests that involve a Helper, a KeyGenerator,
14 # or the control.furl .
18 from twisted.application import service
19 from twisted.internet import reactor
20 from twisted.python.failure import Failure
21 from foolscap.api import Referenceable, fireEventually, RemoteException
22 from base64 import b32encode
23 from allmydata import uri as tahoe_uri
24 from allmydata.client import Client
25 from allmydata.storage.server import StorageServer, storage_index_to_dir
26 from allmydata.util import fileutil, idlib, hashutil
27 from allmydata.introducer.client import RemoteServiceConnector
28 from allmydata.test.common_web import HTTPClientGETFactory
30 class IntentionalError(Exception):
37 def __init__(self, original):
38 self.original = original
40 self.post_call_notifier = None
41 self.disconnectors = {}
43 def callRemoteOnly(self, methname, *args, **kwargs):
44 d = self.callRemote(methname, *args, **kwargs)
47 def callRemote(self, methname, *args, **kwargs):
48 # this is ideally a Membrane, but that's too hard. We do a shallow
49 # wrapping of inbound arguments, and per-methodname wrapping of
50 # selected return values.
52 if isinstance(a, Referenceable):
53 return LocalWrapper(a)
56 args = tuple([wrap(a) for a in args])
57 kwargs = dict([(k,wrap(kwargs[k])) for k in kwargs])
60 raise IntentionalError("I was asked to break")
61 meth = getattr(self.original, "remote_" + methname)
62 return meth(*args, **kwargs)
64 d.addCallback(lambda res: _call())
65 def _wrap_exception(f):
66 return Failure(RemoteException(f))
67 d.addErrback(_wrap_exception)
68 def _return_membrane(res):
69 # rather than complete the difficult task of building a
70 # fully-general Membrane (which would locate all Referenceable
71 # objects that cross the simulated wire and replace them with
72 # wrappers), we special-case certain methods that we happen to
73 # know will return Referenceables.
74 if methname == "allocate_buckets":
75 (alreadygot, allocated) = res
76 for shnum in allocated:
77 allocated[shnum] = LocalWrapper(allocated[shnum])
78 if methname == "get_buckets":
80 res[shnum] = LocalWrapper(res[shnum])
82 d.addCallback(_return_membrane)
83 if self.post_call_notifier:
84 d.addCallback(self.post_call_notifier, methname)
87 def notifyOnDisconnect(self, f, *args, **kwargs):
89 self.disconnectors[m] = (f, args, kwargs)
91 def dontNotifyOnDisconnect(self, marker):
92 del self.disconnectors[marker]
94 def wrap(original, service_name):
95 # Much of the upload/download code uses rref.version (which normally
96 # comes from rrefutil.add_version_to_remote_reference). To avoid using a
97 # network, we want a LocalWrapper here. Try to satisfy all these
98 # constraints at the same time.
99 wrapper = LocalWrapper(original)
101 version = original.remote_get_version()
102 except AttributeError:
103 version = RemoteServiceConnector.VERSION_DEFAULTS[service_name]
104 wrapper.version = version
107 class NoNetworkStorageBroker:
108 def get_servers(self, key):
109 return sorted(self.client._servers,
110 key=lambda x: sha.new(key+x[0]).digest())
112 class NoNetworkClient(Client):
114 def create_tub(self):
116 def init_introducer_client(self):
118 def setup_logging(self):
120 def startService(self):
121 service.MultiService.startService(self)
122 def stopService(self):
123 service.MultiService.stopService(self)
124 def when_tub_ready(self):
125 raise NotImplementedError("NoNetworkClient has no Tub")
126 def init_control(self):
128 def init_helper(self):
130 def init_key_gen(self):
132 def init_storage(self):
134 def init_client_storage_broker(self):
135 self.storage_broker = NoNetworkStorageBroker()
136 self.storage_broker.client = self
137 def init_stub_client(self):
140 def get_servers(self, service_name):
143 def get_nickname_for_serverid(self, serverid):
149 self.stats_producers = []
151 def count(self, name, delta=1):
152 val = self.counters.setdefault(name, 0)
153 self.counters[name] = val + delta
155 def register_producer(self, stats_producer):
156 self.stats_producers.append(stats_producer)
160 for sp in self.stats_producers:
161 stats.update(sp.get_stats())
162 ret = { 'counters': self.counters, 'stats': stats }
165 class NoNetworkGrid(service.MultiService):
166 def __init__(self, basedir, num_clients=1, num_servers=10,
167 client_config_hooks={}):
168 service.MultiService.__init__(self)
169 self.basedir = basedir
170 fileutil.make_dirs(basedir)
172 self.servers_by_number = {}
173 self.servers_by_id = {}
176 for i in range(num_servers):
177 ss = self.make_server(i)
178 self.add_server(i, ss)
180 for i in range(num_clients):
181 clientid = hashutil.tagged_hash("clientid", str(i))[:20]
182 clientdir = os.path.join(basedir, "clients",
183 idlib.shortnodeid_b2a(clientid))
184 fileutil.make_dirs(clientdir)
185 f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
187 f.write("nickname = client-%d\n" % i)
188 f.write("web.port = tcp:0:interface=127.0.0.1\n")
189 f.write("[storage]\n")
190 f.write("enabled = false\n")
193 if i in client_config_hooks:
194 # this hook can either modify tahoe.cfg, or return an
195 # entirely new Client instance
196 c = client_config_hooks[i](clientdir)
198 c = NoNetworkClient(clientdir)
200 c.short_nodeid = b32encode(clientid).lower()[:8]
201 c._servers = self.all_servers # can be updated later
202 c.setServiceParent(self)
203 self.clients.append(c)
205 def make_server(self, i):
206 serverid = hashutil.tagged_hash("serverid", str(i))[:20]
207 serverdir = os.path.join(self.basedir, "servers",
208 idlib.shortnodeid_b2a(serverid))
209 fileutil.make_dirs(serverdir)
210 ss = StorageServer(serverdir, serverid, stats_provider=SimpleStats())
213 def add_server(self, i, ss):
214 # to deal with the fact that all StorageServers are named 'storage',
215 # we interpose a middleman
216 middleman = service.MultiService()
217 middleman.setServiceParent(self)
218 ss.setServiceParent(middleman)
219 serverid = ss.my_nodeid
220 self.servers_by_number[i] = ss
221 self.servers_by_id[serverid] = wrap(ss, "storage")
222 self.all_servers = frozenset(self.servers_by_id.items())
223 for c in self.clients:
224 c._servers = self.all_servers
228 self.s = service.MultiService()
229 self.s.startService()
232 return self.s.stopService()
234 def set_up_grid(self, num_clients=1, num_servers=10,
235 client_config_hooks={}):
236 # self.basedir must be set
237 self.g = NoNetworkGrid(self.basedir,
238 num_clients=num_clients,
239 num_servers=num_servers,
240 client_config_hooks=client_config_hooks)
241 self.g.setServiceParent(self.s)
242 self.client_webports = [c.getServiceNamed("webish").listener._port.getHost().port
243 for c in self.g.clients]
244 self.client_baseurls = ["http://localhost:%d/" % p
245 for p in self.client_webports]
247 def get_clientdir(self, i=0):
248 return self.g.clients[i].basedir
250 def get_serverdir(self, i):
251 return self.g.servers_by_number[i].storedir
253 def iterate_servers(self):
254 for i in sorted(self.g.servers_by_number.keys()):
255 ss = self.g.servers_by_number[i]
256 yield (i, ss, ss.storedir)
258 def find_shares(self, uri):
259 si = tahoe_uri.from_string(uri).get_storage_index()
260 prefixdir = storage_index_to_dir(si)
262 for i,ss in self.g.servers_by_number.items():
263 serverid = ss.my_nodeid
264 basedir = os.path.join(ss.storedir, "shares", prefixdir)
265 if not os.path.exists(basedir):
267 for f in os.listdir(basedir):
270 shares.append((shnum, serverid, os.path.join(basedir, f)))
273 return sorted(shares)
275 def delete_share(self, (shnum, serverid, sharefile)):
278 def delete_shares_numbered(self, uri, shnums):
279 for (i_shnum, i_serverid, i_sharefile) in self.find_shares(uri):
280 if i_shnum in shnums:
281 os.unlink(i_sharefile)
283 def corrupt_share(self, (shnum, serverid, sharefile), corruptor_function):
284 sharedata = open(sharefile, "rb").read()
285 corruptdata = corruptor_function(sharedata)
286 open(sharefile, "wb").write(corruptdata)
288 def corrupt_shares_numbered(self, uri, shnums, corruptor):
289 for (i_shnum, i_serverid, i_sharefile) in self.find_shares(uri):
290 if i_shnum in shnums:
291 sharedata = open(i_sharefile, "rb").read()
292 corruptdata = corruptor(sharedata)
293 open(i_sharefile, "wb").write(corruptdata)
295 def GET(self, urlpath, followRedirect=False, return_response=False,
296 method="GET", clientnum=0, **kwargs):
297 # if return_response=True, this fires with (data, statuscode,
298 # respheaders) instead of just data.
299 assert not isinstance(urlpath, unicode)
300 url = self.client_baseurls[clientnum] + urlpath
301 factory = HTTPClientGETFactory(url, method=method,
302 followRedirect=followRedirect, **kwargs)
303 reactor.connectTCP("localhost", self.client_webports[clientnum],factory)
306 return (data, factory.status, factory.response_headers)
308 d.addCallback(_got_data)
309 return factory.deferred