]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/no_network.py
test/no_network: do startService on the storage servers, make it easier to customize...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / no_network.py
1
2 # This contains a test harness that creates a full Tahoe grid in a single
3 # process (actually in a single MultiService) which does not use the network.
4 # It does not use an Introducer, and there are no foolscap Tubs. Each storage
5 # server puts real shares on disk, but is accessed through loopback
6 # RemoteReferences instead of over serialized SSL. It is not as complete as
7 # the common.SystemTestMixin framework (which does use the network), but
8 # should be considerably faster: on my laptop, it takes 50-80ms to start up,
9 # whereas SystemTestMixin takes close to 2s.
10
11 # This should be useful for tests which want to examine and/or manipulate the
12 # uploaded shares, checker/verifier/repairer tests, etc. The clients have no
13 # Tubs, so it is not useful for tests that involve a Helper, a KeyGenerator,
14 # or the control.furl .
15
16 import os.path
17 import sha
18 from twisted.application import service
19 from foolscap import Referenceable
20 from foolscap.eventual import fireEventually
21 from base64 import b32encode
22 from allmydata import uri as tahoe_uri
23 from allmydata.client import Client
24 from allmydata.storage.server import StorageServer, storage_index_to_dir
25 from allmydata.util import fileutil, idlib, hashutil, rrefutil
26 from allmydata.introducer.client import RemoteServiceConnector
27
28 class IntentionalError(Exception):
29     pass
30
31 class Marker:
32     pass
33
34 class LocalWrapper:
35     def __init__(self, original):
36         self.original = original
37         self.broken = False
38         self.post_call_notifier = None
39         self.disconnectors = {}
40
41     def callRemoteOnly(self, methname, *args, **kwargs):
42         d = self.callRemote(methname, *args, **kwargs)
43         return None
44
45     def callRemote(self, methname, *args, **kwargs):
46         # this is ideally a Membrane, but that's too hard. We do a shallow
47         # wrapping of inbound arguments, and per-methodname wrapping of
48         # selected return values.
49         def wrap(a):
50             if isinstance(a, Referenceable):
51                 return LocalWrapper(a)
52             else:
53                 return a
54         args = tuple([wrap(a) for a in args])
55         kwargs = dict([(k,wrap(kwargs[k])) for k in kwargs])
56         def _call():
57             if self.broken:
58                 raise IntentionalError("I was asked to break")
59             meth = getattr(self.original, "remote_" + methname)
60             return meth(*args, **kwargs)
61         d = fireEventually()
62         d.addCallback(lambda res: _call())
63         def _return_membrane(res):
64             # rather than complete the difficult task of building a
65             # fully-general Membrane (which would locate all Referenceable
66             # objects that cross the simulated wire and replace them with
67             # wrappers), we special-case certain methods that we happen to
68             # know will return Referenceables.
69             if methname == "allocate_buckets":
70                 (alreadygot, allocated) = res
71                 for shnum in allocated:
72                     allocated[shnum] = LocalWrapper(allocated[shnum])
73             if methname == "get_buckets":
74                 for shnum in res:
75                     res[shnum] = LocalWrapper(res[shnum])
76             return res
77         d.addCallback(_return_membrane)
78         if self.post_call_notifier:
79             d.addCallback(self.post_call_notifier, methname)
80         return d
81
82     def notifyOnDisconnect(self, f, *args, **kwargs):
83         m = Marker()
84         self.disconnectors[m] = (f, args, kwargs)
85         return m
86     def dontNotifyOnDisconnect(self, marker):
87         del self.disconnectors[marker]
88
89 def wrap(original, service_name):
90     # The code in immutable.checker insists upon asserting the truth of
91     # isinstance(rref, rrefutil.WrappedRemoteReference). Much of the
92     # upload/download code uses rref.version (which normally comes from
93     # rrefutil.VersionedRemoteReference). To avoid using a network, we want a
94     # LocalWrapper here. Try to satisfy all these constraints at the same
95     # time.
96     local = LocalWrapper(original)
97     wrapped = rrefutil.WrappedRemoteReference(local)
98     try:
99         version = original.remote_get_version()
100     except AttributeError:
101         version = RemoteServiceConnector.VERSION_DEFAULTS[service_name]
102     wrapped.version = version
103     return wrapped
104
105 class NoNetworkClient(Client):
106
107     def create_tub(self):
108         pass
109     def init_introducer_client(self):
110         pass
111     def setup_logging(self):
112         pass
113     def startService(self):
114         service.MultiService.startService(self)
115     def stopService(self):
116         service.MultiService.stopService(self)
117     def when_tub_ready(self):
118         raise RuntimeError("NoNetworkClient has no Tub")
119     def init_control(self):
120         pass
121     def init_helper(self):
122         pass
123     def init_key_gen(self):
124         pass
125     def init_storage(self):
126         pass
127     def init_stub_client(self):
128         pass
129
130     def get_servers(self, service_name):
131         return self._servers
132
133     def get_permuted_peers(self, service_name, key):
134         return sorted(self._servers, key=lambda x: sha.new(key+x[0]).digest())
135     def get_nickname_for_peerid(self, peerid):
136         return None
137
138
139 class NoNetworkGrid(service.MultiService):
140     def __init__(self, basedir, num_clients=1, num_servers=10,
141                  client_config_hooks={}):
142         service.MultiService.__init__(self)
143         self.basedir = basedir
144         fileutil.make_dirs(basedir)
145
146         self.servers_by_number = {}
147         self.servers_by_id = {}
148         self.clients = []
149
150         for i in range(num_servers):
151             ss = self.make_server(i)
152             self.add_server(i, ss)
153
154         for i in range(num_clients):
155             clientid = hashutil.tagged_hash("clientid", str(i))[:20]
156             clientdir = os.path.join(basedir, "clients",
157                                      idlib.shortnodeid_b2a(clientid))
158             fileutil.make_dirs(clientdir)
159             f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
160             f.write("[node]\n")
161             f.write("nickname = client-%d\n" % i)
162             f.write("web.port = tcp:0:interface=127.0.0.1\n")
163             f.write("[storage]\n")
164             f.write("enabled = false\n")
165             f.close()
166             c = None
167             if i in client_config_hooks:
168                 # this hook can either modify tahoe.cfg, or return an
169                 # entirely new Client instance
170                 c = client_config_hooks[i](clientdir)
171             if not c:
172                 c = NoNetworkClient(clientdir)
173             c.nodeid = clientid
174             c.short_nodeid = b32encode(clientid).lower()[:8]
175             c._servers = self.all_servers # can be updated later
176             c.setServiceParent(self)
177             self.clients.append(c)
178
179     def make_server(self, i):
180         serverid = hashutil.tagged_hash("serverid", str(i))[:20]
181         serverdir = os.path.join(self.basedir, "servers",
182                                  idlib.shortnodeid_b2a(serverid))
183         fileutil.make_dirs(serverdir)
184         ss = StorageServer(serverdir, serverid)
185         return ss
186
187     def add_server(self, i, ss):
188         # to deal with the fact that all StorageServers are named 'storage',
189         # we interpose a middleman
190         middleman = service.MultiService()
191         middleman.setServiceParent(self)
192         ss.setServiceParent(middleman)
193         serverid = ss.my_nodeid
194         self.servers_by_number[i] = ss
195         self.servers_by_id[serverid] = wrap(ss, "storage")
196         self.all_servers = frozenset(self.servers_by_id.items())
197         for c in self.clients:
198             c._servers = self.all_servers
199
200 class GridTestMixin:
201     def setUp(self):
202         self.s = service.MultiService()
203         self.s.startService()
204
205     def tearDown(self):
206         return self.s.stopService()
207
208     def set_up_grid(self, num_clients=1, num_servers=10,
209                     client_config_hooks={}):
210         # self.basedir must be set
211         self.g = NoNetworkGrid(self.basedir,
212                                num_clients=num_clients,
213                                num_servers=num_servers,
214                                client_config_hooks=client_config_hooks)
215         self.g.setServiceParent(self.s)
216         self.client_webports = [c.getServiceNamed("webish").listener._port.getHost().port
217                                 for c in self.g.clients]
218         self.client_baseurls = ["http://localhost:%d/" % p
219                                 for p in self.client_webports]
220
221     def get_clientdir(self, i=0):
222         return self.g.clients[i].basedir
223
224     def get_serverdir(self, i):
225         return self.g.servers_by_number[i].storedir
226
227     def iterate_servers(self):
228         for i in sorted(self.g.servers_by_number.keys()):
229             ss = self.g.servers_by_number[i]
230             yield (i, ss, ss.storedir)
231
232     def find_shares(self, uri):
233         si = tahoe_uri.from_string(uri).get_storage_index()
234         prefixdir = storage_index_to_dir(si)
235         shares = []
236         for i,ss in self.g.servers_by_number.items():
237             serverid = ss.my_nodeid
238             basedir = os.path.join(ss.storedir, "shares", prefixdir)
239             if not os.path.exists(basedir):
240                 continue
241             for f in os.listdir(basedir):
242                 try:
243                     shnum = int(f)
244                     shares.append((shnum, serverid, os.path.join(basedir, f)))
245                 except ValueError:
246                     pass
247         return sorted(shares)
248