2 # This contains a test harness that creates a full Tahoe grid in a single
3 # process (actually in a single MultiService) which does not use the network.
4 # It does not use an Introducer, and there are no foolscap Tubs. Each storage
5 # server puts real shares on disk, but is accessed through loopback
6 # RemoteReferences instead of over serialized SSL. It is not as complete as
7 # the common.SystemTestMixin framework (which does use the network), but
8 # should be considerably faster: on my laptop, it takes 50-80ms to start up,
9 # whereas SystemTestMixin takes close to 2s.
11 # This should be useful for tests which want to examine and/or manipulate the
12 # uploaded shares, checker/verifier/repairer tests, etc. The clients have no
13 # Tubs, so it is not useful for tests that involve a Helper, a KeyGenerator,
14 # or the control.furl .
18 from zope.interface import implements
19 from twisted.application import service
20 from twisted.internet import reactor
21 from twisted.python.failure import Failure
22 from foolscap.api import Referenceable, fireEventually, RemoteException
23 from base64 import b32encode
24 from allmydata import uri as tahoe_uri
25 from allmydata.client import Client
26 from allmydata.storage.server import StorageServer, storage_index_to_dir
27 from allmydata.util import fileutil, idlib, hashutil
28 from allmydata.test.common_web import HTTPClientGETFactory
29 from allmydata.interfaces import IStorageBroker
31 class IntentionalError(Exception):
38 def __init__(self, original):
39 self.original = original
41 self.post_call_notifier = None
42 self.disconnectors = {}
44 def callRemoteOnly(self, methname, *args, **kwargs):
45 d = self.callRemote(methname, *args, **kwargs)
48 def callRemote(self, methname, *args, **kwargs):
49 # this is ideally a Membrane, but that's too hard. We do a shallow
50 # wrapping of inbound arguments, and per-methodname wrapping of
51 # selected return values.
53 if isinstance(a, Referenceable):
54 return LocalWrapper(a)
57 args = tuple([wrap(a) for a in args])
58 kwargs = dict([(k,wrap(kwargs[k])) for k in kwargs])
61 raise IntentionalError("I was asked to break")
62 meth = getattr(self.original, "remote_" + methname)
63 return meth(*args, **kwargs)
65 d.addCallback(lambda res: _call())
66 def _wrap_exception(f):
67 return Failure(RemoteException(f))
68 d.addErrback(_wrap_exception)
69 def _return_membrane(res):
70 # rather than complete the difficult task of building a
71 # fully-general Membrane (which would locate all Referenceable
72 # objects that cross the simulated wire and replace them with
73 # wrappers), we special-case certain methods that we happen to
74 # know will return Referenceables.
75 if methname == "allocate_buckets":
76 (alreadygot, allocated) = res
77 for shnum in allocated:
78 allocated[shnum] = LocalWrapper(allocated[shnum])
79 if methname == "get_buckets":
81 res[shnum] = LocalWrapper(res[shnum])
83 d.addCallback(_return_membrane)
84 if self.post_call_notifier:
85 d.addCallback(self.post_call_notifier, methname)
88 def notifyOnDisconnect(self, f, *args, **kwargs):
90 self.disconnectors[m] = (f, args, kwargs)
92 def dontNotifyOnDisconnect(self, marker):
93 del self.disconnectors[marker]
95 def wrap_storage_server(original):
96 # Much of the upload/download code uses rref.version (which normally
97 # comes from rrefutil.add_version_to_remote_reference). To avoid using a
98 # network, we want a LocalWrapper here. Try to satisfy all these
99 # constraints at the same time.
100 wrapper = LocalWrapper(original)
101 wrapper.version = original.remote_get_version()
104 class NoNetworkStorageBroker:
105 implements(IStorageBroker)
106 def get_servers_for_index(self, key):
107 return sorted(self.client._servers,
108 key=lambda x: sha.new(key+x[0]).digest())
109 def get_all_servers(self):
110 return frozenset(self.client._servers)
111 def get_nickname_for_serverid(self, serverid):
114 class NoNetworkClient(Client):
116 def create_tub(self):
118 def init_introducer_client(self):
120 def setup_logging(self):
122 def startService(self):
123 service.MultiService.startService(self)
124 def stopService(self):
125 service.MultiService.stopService(self)
126 def when_tub_ready(self):
127 raise NotImplementedError("NoNetworkClient has no Tub")
128 def init_control(self):
130 def init_helper(self):
132 def init_key_gen(self):
134 def init_storage(self):
136 def init_client_storage_broker(self):
137 self.storage_broker = NoNetworkStorageBroker()
138 self.storage_broker.client = self
139 def init_stub_client(self):
141 #._servers will be set by the NoNetworkGrid which creates us
146 self.stats_producers = []
148 def count(self, name, delta=1):
149 val = self.counters.setdefault(name, 0)
150 self.counters[name] = val + delta
152 def register_producer(self, stats_producer):
153 self.stats_producers.append(stats_producer)
157 for sp in self.stats_producers:
158 stats.update(sp.get_stats())
159 ret = { 'counters': self.counters, 'stats': stats }
162 class NoNetworkGrid(service.MultiService):
163 def __init__(self, basedir, num_clients=1, num_servers=10,
164 client_config_hooks={}):
165 service.MultiService.__init__(self)
166 self.basedir = basedir
167 fileutil.make_dirs(basedir)
169 self.servers_by_number = {}
170 self.servers_by_id = {}
173 for i in range(num_servers):
174 ss = self.make_server(i)
175 self.add_server(i, ss)
177 for i in range(num_clients):
178 clientid = hashutil.tagged_hash("clientid", str(i))[:20]
179 clientdir = os.path.join(basedir, "clients",
180 idlib.shortnodeid_b2a(clientid))
181 fileutil.make_dirs(clientdir)
182 f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
184 f.write("nickname = client-%d\n" % i)
185 f.write("web.port = tcp:0:interface=127.0.0.1\n")
186 f.write("[storage]\n")
187 f.write("enabled = false\n")
190 if i in client_config_hooks:
191 # this hook can either modify tahoe.cfg, or return an
192 # entirely new Client instance
193 c = client_config_hooks[i](clientdir)
195 c = NoNetworkClient(clientdir)
197 c.short_nodeid = b32encode(clientid).lower()[:8]
198 c._servers = self.all_servers # can be updated later
199 c.setServiceParent(self)
200 self.clients.append(c)
202 def make_server(self, i):
203 serverid = hashutil.tagged_hash("serverid", str(i))[:20]
204 serverdir = os.path.join(self.basedir, "servers",
205 idlib.shortnodeid_b2a(serverid))
206 fileutil.make_dirs(serverdir)
207 ss = StorageServer(serverdir, serverid, stats_provider=SimpleStats())
210 def add_server(self, i, ss):
211 # to deal with the fact that all StorageServers are named 'storage',
212 # we interpose a middleman
213 middleman = service.MultiService()
214 middleman.setServiceParent(self)
215 ss.setServiceParent(middleman)
216 serverid = ss.my_nodeid
217 self.servers_by_number[i] = ss
218 self.servers_by_id[serverid] = wrap_storage_server(ss)
219 self.all_servers = frozenset(self.servers_by_id.items())
220 for c in self.clients:
221 c._servers = self.all_servers
225 self.s = service.MultiService()
226 self.s.startService()
229 return self.s.stopService()
231 def set_up_grid(self, num_clients=1, num_servers=10,
232 client_config_hooks={}):
233 # self.basedir must be set
234 self.g = NoNetworkGrid(self.basedir,
235 num_clients=num_clients,
236 num_servers=num_servers,
237 client_config_hooks=client_config_hooks)
238 self.g.setServiceParent(self.s)
239 self.client_webports = [c.getServiceNamed("webish").listener._port.getHost().port
240 for c in self.g.clients]
241 self.client_baseurls = ["http://localhost:%d/" % p
242 for p in self.client_webports]
244 def get_clientdir(self, i=0):
245 return self.g.clients[i].basedir
247 def get_serverdir(self, i):
248 return self.g.servers_by_number[i].storedir
250 def iterate_servers(self):
251 for i in sorted(self.g.servers_by_number.keys()):
252 ss = self.g.servers_by_number[i]
253 yield (i, ss, ss.storedir)
255 def find_shares(self, uri):
256 si = tahoe_uri.from_string(uri).get_storage_index()
257 prefixdir = storage_index_to_dir(si)
259 for i,ss in self.g.servers_by_number.items():
260 serverid = ss.my_nodeid
261 basedir = os.path.join(ss.storedir, "shares", prefixdir)
262 if not os.path.exists(basedir):
264 for f in os.listdir(basedir):
267 shares.append((shnum, serverid, os.path.join(basedir, f)))
270 return sorted(shares)
272 def delete_share(self, (shnum, serverid, sharefile)):
275 def delete_shares_numbered(self, uri, shnums):
276 for (i_shnum, i_serverid, i_sharefile) in self.find_shares(uri):
277 if i_shnum in shnums:
278 os.unlink(i_sharefile)
280 def corrupt_share(self, (shnum, serverid, sharefile), corruptor_function):
281 sharedata = open(sharefile, "rb").read()
282 corruptdata = corruptor_function(sharedata)
283 open(sharefile, "wb").write(corruptdata)
285 def corrupt_shares_numbered(self, uri, shnums, corruptor):
286 for (i_shnum, i_serverid, i_sharefile) in self.find_shares(uri):
287 if i_shnum in shnums:
288 sharedata = open(i_sharefile, "rb").read()
289 corruptdata = corruptor(sharedata)
290 open(i_sharefile, "wb").write(corruptdata)
292 def GET(self, urlpath, followRedirect=False, return_response=False,
293 method="GET", clientnum=0, **kwargs):
294 # if return_response=True, this fires with (data, statuscode,
295 # respheaders) instead of just data.
296 assert not isinstance(urlpath, unicode)
297 url = self.client_baseurls[clientnum] + urlpath
298 factory = HTTPClientGETFactory(url, method=method,
299 followRedirect=followRedirect, **kwargs)
300 reactor.connectTCP("localhost", self.client_webports[clientnum],factory)
303 return (data, factory.status, factory.response_headers)
305 d.addCallback(_got_data)
306 return factory.deferred