2 # This contains a test harness that creates a full Tahoe grid in a single
3 # process (actually in a single MultiService) which does not use the network.
4 # It does not use an Introducer, and there are no foolscap Tubs. Each storage
5 # server puts real shares on disk, but is accessed through loopback
6 # RemoteReferences instead of over serialized SSL. It is not as complete as
7 # the common.SystemTestMixin framework (which does use the network), but
8 # should be considerably faster: on my laptop, it takes 50-80ms to start up,
9 # whereas SystemTestMixin takes close to 2s.
11 # This should be useful for tests which want to examine and/or manipulate the
12 # uploaded shares, checker/verifier/repairer tests, etc. The clients have no
13 # Tubs, so it is not useful for tests that involve a Helper, a KeyGenerator,
14 # or the control.furl .
18 from twisted.application import service
19 from foolscap import Referenceable
20 from foolscap.eventual import fireEventually
21 from base64 import b32encode
22 from allmydata import uri as tahoe_uri
23 from allmydata.client import Client
24 from allmydata.storage.server import StorageServer, storage_index_to_dir
25 from allmydata.util import fileutil, idlib, hashutil, rrefutil
26 from allmydata.introducer.client import RemoteServiceConnector
28 class IntentionalError(Exception):
35 def __init__(self, original):
36 self.original = original
38 self.post_call_notifier = None
39 self.disconnectors = {}
41 def callRemoteOnly(self, methname, *args, **kwargs):
42 d = self.callRemote(methname, *args, **kwargs)
45 def callRemote(self, methname, *args, **kwargs):
46 # this is ideally a Membrane, but that's too hard. We do a shallow
47 # wrapping of inbound arguments, and per-methodname wrapping of
48 # selected return values.
50 if isinstance(a, Referenceable):
51 return LocalWrapper(a)
54 args = tuple([wrap(a) for a in args])
55 kwargs = dict([(k,wrap(kwargs[k])) for k in kwargs])
58 raise IntentionalError("I was asked to break")
59 meth = getattr(self.original, "remote_" + methname)
60 return meth(*args, **kwargs)
62 d.addCallback(lambda res: _call())
63 def _return_membrane(res):
64 # rather than complete the difficult task of building a
65 # fully-general Membrane (which would locate all Referenceable
66 # objects that cross the simulated wire and replace them with
67 # wrappers), we special-case certain methods that we happen to
68 # know will return Referenceables.
69 if methname == "allocate_buckets":
70 (alreadygot, allocated) = res
71 for shnum in allocated:
72 allocated[shnum] = LocalWrapper(allocated[shnum])
73 if methname == "get_buckets":
75 res[shnum] = LocalWrapper(res[shnum])
77 d.addCallback(_return_membrane)
78 if self.post_call_notifier:
79 d.addCallback(self.post_call_notifier, methname)
82 def notifyOnDisconnect(self, f, *args, **kwargs):
84 self.disconnectors[m] = (f, args, kwargs)
86 def dontNotifyOnDisconnect(self, marker):
87 del self.disconnectors[marker]
89 def wrap(original, service_name):
90 # The code in immutable.checker insists upon asserting the truth of
91 # isinstance(rref, rrefutil.WrappedRemoteReference). Much of the
92 # upload/download code uses rref.version (which normally comes from
93 # rrefutil.VersionedRemoteReference). To avoid using a network, we want a
94 # LocalWrapper here. Try to satisfy all these constraints at the same
96 local = LocalWrapper(original)
97 wrapped = rrefutil.WrappedRemoteReference(local)
99 version = original.remote_get_version()
100 except AttributeError:
101 version = RemoteServiceConnector.VERSION_DEFAULTS[service_name]
102 wrapped.version = version
105 class NoNetworkClient(Client):
107 def create_tub(self):
109 def init_introducer_client(self):
111 def setup_logging(self):
113 def startService(self):
114 service.MultiService.startService(self)
115 def stopService(self):
116 service.MultiService.stopService(self)
117 def when_tub_ready(self):
118 raise RuntimeError("NoNetworkClient has no Tub")
119 def init_control(self):
121 def init_helper(self):
123 def init_key_gen(self):
125 def init_storage(self):
127 def init_stub_client(self):
130 def get_servers(self, service_name):
133 def get_permuted_peers(self, service_name, key):
134 return sorted(self._servers, key=lambda x: sha.new(key+x[0]).digest())
135 def get_nickname_for_peerid(self, peerid):
139 class NoNetworkGrid(service.MultiService):
140 def __init__(self, basedir, num_clients=1, num_servers=10,
141 client_config_hooks={}):
142 service.MultiService.__init__(self)
143 self.basedir = basedir
144 fileutil.make_dirs(basedir)
146 self.servers_by_number = {}
147 self.servers_by_id = {}
150 for i in range(num_servers):
151 ss = self.make_server(i)
152 self.add_server(i, ss)
154 for i in range(num_clients):
155 clientid = hashutil.tagged_hash("clientid", str(i))[:20]
156 clientdir = os.path.join(basedir, "clients",
157 idlib.shortnodeid_b2a(clientid))
158 fileutil.make_dirs(clientdir)
159 f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
161 f.write("nickname = client-%d\n" % i)
162 f.write("web.port = tcp:0:interface=127.0.0.1\n")
163 f.write("[storage]\n")
164 f.write("enabled = false\n")
167 if i in client_config_hooks:
168 # this hook can either modify tahoe.cfg, or return an
169 # entirely new Client instance
170 c = client_config_hooks[i](clientdir)
172 c = NoNetworkClient(clientdir)
174 c.short_nodeid = b32encode(clientid).lower()[:8]
175 c._servers = self.all_servers # can be updated later
176 c.setServiceParent(self)
177 self.clients.append(c)
179 def make_server(self, i):
180 serverid = hashutil.tagged_hash("serverid", str(i))[:20]
181 serverdir = os.path.join(self.basedir, "servers",
182 idlib.shortnodeid_b2a(serverid))
183 fileutil.make_dirs(serverdir)
184 ss = StorageServer(serverdir, serverid)
187 def add_server(self, i, ss):
188 # to deal with the fact that all StorageServers are named 'storage',
189 # we interpose a middleman
190 middleman = service.MultiService()
191 middleman.setServiceParent(self)
192 ss.setServiceParent(middleman)
193 serverid = ss.my_nodeid
194 self.servers_by_number[i] = ss
195 self.servers_by_id[serverid] = wrap(ss, "storage")
196 self.all_servers = frozenset(self.servers_by_id.items())
197 for c in self.clients:
198 c._servers = self.all_servers
202 self.s = service.MultiService()
203 self.s.startService()
206 return self.s.stopService()
208 def set_up_grid(self, num_clients=1, num_servers=10,
209 client_config_hooks={}):
210 # self.basedir must be set
211 self.g = NoNetworkGrid(self.basedir,
212 num_clients=num_clients,
213 num_servers=num_servers,
214 client_config_hooks=client_config_hooks)
215 self.g.setServiceParent(self.s)
216 self.client_webports = [c.getServiceNamed("webish").listener._port.getHost().port
217 for c in self.g.clients]
218 self.client_baseurls = ["http://localhost:%d/" % p
219 for p in self.client_webports]
221 def get_clientdir(self, i=0):
222 return self.g.clients[i].basedir
224 def get_serverdir(self, i):
225 return self.g.servers_by_number[i].storedir
227 def iterate_servers(self):
228 for i in sorted(self.g.servers_by_number.keys()):
229 ss = self.g.servers_by_number[i]
230 yield (i, ss, ss.storedir)
232 def find_shares(self, uri):
233 si = tahoe_uri.from_string(uri).get_storage_index()
234 prefixdir = storage_index_to_dir(si)
236 for i,ss in self.g.servers_by_number.items():
237 serverid = ss.my_nodeid
238 basedir = os.path.join(ss.storedir, "shares", prefixdir)
239 if not os.path.exists(basedir):
241 for f in os.listdir(basedir):
244 shares.append((shnum, serverid, os.path.join(basedir, f)))
247 return sorted(shares)