]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/no_network.py
08ef42770c04bfe3788eca967f6ba43c34d1d5ea
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / no_network.py
1
2 # This contains a test harness that creates a full Tahoe grid in a single
3 # process (actually in a single MultiService) which does not use the network.
4 # It does not use an Introducer, and there are no foolscap Tubs. Each storage
5 # server puts real shares on disk, but is accessed through loopback
6 # RemoteReferences instead of over serialized SSL. It is not as complete as
7 # the common.SystemTestMixin framework (which does use the network), but
8 # should be considerably faster: on my laptop, it takes 50-80ms to start up,
9 # whereas SystemTestMixin takes close to 2s.
10
11 # This should be useful for tests which want to examine and/or manipulate the
12 # uploaded shares, checker/verifier/repairer tests, etc. The clients have no
13 # Tubs, so it is not useful for tests that involve a Helper, a KeyGenerator,
14 # or the control.furl .
15
16 import os.path
17 from zope.interface import implements
18 from twisted.application import service
19 from twisted.internet import defer, reactor
20 from twisted.python.failure import Failure
21 from foolscap.api import Referenceable, fireEventually, RemoteException
22 from base64 import b32encode
23 from allmydata import uri as tahoe_uri
24 from allmydata.client import Client
25 from allmydata.storage.server import StorageServer, storage_index_to_dir
26 from allmydata.util import fileutil, idlib, hashutil
27 from allmydata.util.hashutil import sha1
28 from allmydata.test.common_web import HTTPClientGETFactory
29 from allmydata.interfaces import IStorageBroker, IServer
30 from allmydata.test.common import TEST_RSA_KEY_SIZE
31
32
33 class IntentionalError(Exception):
34     pass
35
36 class Marker:
37     pass
38
39 class LocalWrapper:
40     def __init__(self, original):
41         self.original = original
42         self.broken = False
43         self.hung_until = None
44         self.post_call_notifier = None
45         self.disconnectors = {}
46
47     def callRemoteOnly(self, methname, *args, **kwargs):
48         d = self.callRemote(methname, *args, **kwargs)
49         del d # explicitly ignored
50         return None
51
52     def callRemote(self, methname, *args, **kwargs):
53         # this is ideally a Membrane, but that's too hard. We do a shallow
54         # wrapping of inbound arguments, and per-methodname wrapping of
55         # selected return values.
56         def wrap(a):
57             if isinstance(a, Referenceable):
58                 return LocalWrapper(a)
59             else:
60                 return a
61         args = tuple([wrap(a) for a in args])
62         kwargs = dict([(k,wrap(kwargs[k])) for k in kwargs])
63
64         def _really_call():
65             meth = getattr(self.original, "remote_" + methname)
66             return meth(*args, **kwargs)
67
68         def _call():
69             if self.broken:
70                 if self.broken is not True: # a counter, not boolean
71                     self.broken -= 1
72                 raise IntentionalError("I was asked to break")
73             if self.hung_until:
74                 d2 = defer.Deferred()
75                 self.hung_until.addCallback(lambda ign: _really_call())
76                 self.hung_until.addCallback(lambda res: d2.callback(res))
77                 def _err(res):
78                     d2.errback(res)
79                     return res
80                 self.hung_until.addErrback(_err)
81                 return d2
82             return _really_call()
83
84         d = fireEventually()
85         d.addCallback(lambda res: _call())
86         def _wrap_exception(f):
87             return Failure(RemoteException(f))
88         d.addErrback(_wrap_exception)
89         def _return_membrane(res):
90             # rather than complete the difficult task of building a
91             # fully-general Membrane (which would locate all Referenceable
92             # objects that cross the simulated wire and replace them with
93             # wrappers), we special-case certain methods that we happen to
94             # know will return Referenceables.
95             if methname == "allocate_buckets":
96                 (alreadygot, allocated) = res
97                 for shnum in allocated:
98                     allocated[shnum] = LocalWrapper(allocated[shnum])
99             if methname == "get_buckets":
100                 for shnum in res:
101                     res[shnum] = LocalWrapper(res[shnum])
102             return res
103         d.addCallback(_return_membrane)
104         if self.post_call_notifier:
105             d.addCallback(self.post_call_notifier, self, methname)
106         return d
107
108     def notifyOnDisconnect(self, f, *args, **kwargs):
109         m = Marker()
110         self.disconnectors[m] = (f, args, kwargs)
111         return m
112     def dontNotifyOnDisconnect(self, marker):
113         del self.disconnectors[marker]
114
115 def wrap_storage_server(original):
116     # Much of the upload/download code uses rref.version (which normally
117     # comes from rrefutil.add_version_to_remote_reference). To avoid using a
118     # network, we want a LocalWrapper here. Try to satisfy all these
119     # constraints at the same time.
120     wrapper = LocalWrapper(original)
121     wrapper.version = original.remote_get_version()
122     return wrapper
123
124 class NoNetworkServer:
125     implements(IServer)
126     def __init__(self, serverid, rref):
127         self.serverid = serverid
128         self.rref = rref
129     def __repr__(self):
130         return "<NoNetworkServer for %s>" % self.get_name()
131     # Special method used by copy.copy() and copy.deepcopy(). When those are
132     # used in allmydata.immutable.filenode to copy CheckResults during
133     # repair, we want it to treat the IServer instances as singletons.
134     def __copy__(self):
135         return self
136     def __deepcopy__(self, memodict):
137         return self
138     def get_serverid(self):
139         return self.serverid
140     def get_permutation_seed(self):
141         return self.serverid
142     def get_lease_seed(self):
143         return self.serverid
144     def get_foolscap_write_enabler_seed(self):
145         return self.serverid
146
147     def get_name(self):
148         return idlib.shortnodeid_b2a(self.serverid)
149     def get_longname(self):
150         return idlib.nodeid_b2a(self.serverid)
151     def get_nickname(self):
152         return "nickname"
153     def get_rref(self):
154         return self.rref
155     def get_version(self):
156         return self.rref.version
157
158 class NoNetworkStorageBroker:
159     implements(IStorageBroker)
160     def get_servers_for_psi(self, peer_selection_index):
161         def _permuted(server):
162             seed = server.get_permutation_seed()
163             return sha1(peer_selection_index + seed).digest()
164         return sorted(self.get_connected_servers(), key=_permuted)
165     def get_connected_servers(self):
166         return self.client._servers
167     def get_nickname_for_serverid(self, serverid):
168         return None
169
170 class NoNetworkClient(Client):
171     def create_tub(self):
172         pass
173     def init_introducer_client(self):
174         pass
175     def setup_logging(self):
176         pass
177     def startService(self):
178         service.MultiService.startService(self)
179     def stopService(self):
180         service.MultiService.stopService(self)
181     def when_tub_ready(self):
182         raise NotImplementedError("NoNetworkClient has no Tub")
183     def init_control(self):
184         pass
185     def init_helper(self):
186         pass
187     def init_key_gen(self):
188         pass
189     def init_storage(self):
190         pass
191     def init_client_storage_broker(self):
192         self.storage_broker = NoNetworkStorageBroker()
193         self.storage_broker.client = self
194     def init_stub_client(self):
195         pass
196     #._servers will be set by the NoNetworkGrid which creates us
197
198 class SimpleStats:
199     def __init__(self):
200         self.counters = {}
201         self.stats_producers = []
202
203     def count(self, name, delta=1):
204         val = self.counters.setdefault(name, 0)
205         self.counters[name] = val + delta
206
207     def register_producer(self, stats_producer):
208         self.stats_producers.append(stats_producer)
209
210     def get_stats(self):
211         stats = {}
212         for sp in self.stats_producers:
213             stats.update(sp.get_stats())
214         ret = { 'counters': self.counters, 'stats': stats }
215         return ret
216
217 class NoNetworkGrid(service.MultiService):
218     def __init__(self, basedir, num_clients=1, num_servers=10,
219                  client_config_hooks={}):
220         service.MultiService.__init__(self)
221         self.basedir = basedir
222         fileutil.make_dirs(basedir)
223
224         self.servers_by_number = {} # maps to StorageServer instance
225         self.wrappers_by_id = {} # maps to wrapped StorageServer instance
226         self.proxies_by_id = {} # maps to IServer on which .rref is a wrapped
227                                 # StorageServer
228         self.clients = []
229
230         for i in range(num_servers):
231             ss = self.make_server(i)
232             self.add_server(i, ss)
233         self.rebuild_serverlist()
234
235         for i in range(num_clients):
236             clientid = hashutil.tagged_hash("clientid", str(i))[:20]
237             clientdir = os.path.join(basedir, "clients",
238                                      idlib.shortnodeid_b2a(clientid))
239             fileutil.make_dirs(clientdir)
240             f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
241             f.write("[node]\n")
242             f.write("nickname = client-%d\n" % i)
243             f.write("web.port = tcp:0:interface=127.0.0.1\n")
244             f.write("[storage]\n")
245             f.write("enabled = false\n")
246             f.close()
247             c = None
248             if i in client_config_hooks:
249                 # this hook can either modify tahoe.cfg, or return an
250                 # entirely new Client instance
251                 c = client_config_hooks[i](clientdir)
252             if not c:
253                 c = NoNetworkClient(clientdir)
254                 c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE)
255             c.nodeid = clientid
256             c.short_nodeid = b32encode(clientid).lower()[:8]
257             c._servers = self.all_servers # can be updated later
258             c.setServiceParent(self)
259             self.clients.append(c)
260
261     def make_server(self, i, readonly=False):
262         serverid = hashutil.tagged_hash("serverid", str(i))[:20]
263         serverdir = os.path.join(self.basedir, "servers",
264                                  idlib.shortnodeid_b2a(serverid), "storage")
265         fileutil.make_dirs(serverdir)
266         ss = StorageServer(serverdir, serverid, stats_provider=SimpleStats(),
267                            readonly_storage=readonly)
268         ss._no_network_server_number = i
269         return ss
270
271     def add_server(self, i, ss):
272         # to deal with the fact that all StorageServers are named 'storage',
273         # we interpose a middleman
274         middleman = service.MultiService()
275         middleman.setServiceParent(self)
276         ss.setServiceParent(middleman)
277         serverid = ss.my_nodeid
278         self.servers_by_number[i] = ss
279         wrapper = wrap_storage_server(ss)
280         self.wrappers_by_id[serverid] = wrapper
281         self.proxies_by_id[serverid] = NoNetworkServer(serverid, wrapper)
282         self.rebuild_serverlist()
283
284     def get_all_serverids(self):
285         return self.proxies_by_id.keys()
286
287     def rebuild_serverlist(self):
288         self.all_servers = frozenset(self.proxies_by_id.values())
289         for c in self.clients:
290             c._servers = self.all_servers
291
292     def remove_server(self, serverid):
293         # it's enough to remove the server from c._servers (we don't actually
294         # have to detach and stopService it)
295         for i,ss in self.servers_by_number.items():
296             if ss.my_nodeid == serverid:
297                 del self.servers_by_number[i]
298                 break
299         del self.wrappers_by_id[serverid]
300         del self.proxies_by_id[serverid]
301         self.rebuild_serverlist()
302         return ss
303
304     def break_server(self, serverid, count=True):
305         # mark the given server as broken, so it will throw exceptions when
306         # asked to hold a share or serve a share. If count= is a number,
307         # throw that many exceptions before starting to work again.
308         self.wrappers_by_id[serverid].broken = count
309
310     def hang_server(self, serverid):
311         # hang the given server
312         ss = self.wrappers_by_id[serverid]
313         assert ss.hung_until is None
314         ss.hung_until = defer.Deferred()
315
316     def unhang_server(self, serverid):
317         # unhang the given server
318         ss = self.wrappers_by_id[serverid]
319         assert ss.hung_until is not None
320         ss.hung_until.callback(None)
321         ss.hung_until = None
322
323
324 class GridTestMixin:
325     def setUp(self):
326         self.s = service.MultiService()
327         self.s.startService()
328
329     def tearDown(self):
330         return self.s.stopService()
331
332     def set_up_grid(self, num_clients=1, num_servers=10,
333                     client_config_hooks={}):
334         # self.basedir must be set
335         self.g = NoNetworkGrid(self.basedir,
336                                num_clients=num_clients,
337                                num_servers=num_servers,
338                                client_config_hooks=client_config_hooks)
339         self.g.setServiceParent(self.s)
340         self.client_webports = [c.getServiceNamed("webish").getPortnum()
341                                 for c in self.g.clients]
342         self.client_baseurls = [c.getServiceNamed("webish").getURL()
343                                 for c in self.g.clients]
344
345     def get_clientdir(self, i=0):
346         return self.g.clients[i].basedir
347
348     def get_serverdir(self, i):
349         return self.g.servers_by_number[i].storedir
350
351     def iterate_servers(self):
352         for i in sorted(self.g.servers_by_number.keys()):
353             ss = self.g.servers_by_number[i]
354             yield (i, ss, ss.storedir)
355
356     def find_uri_shares(self, uri):
357         si = tahoe_uri.from_string(uri).get_storage_index()
358         prefixdir = storage_index_to_dir(si)
359         shares = []
360         for i,ss in self.g.servers_by_number.items():
361             serverid = ss.my_nodeid
362             basedir = os.path.join(ss.sharedir, prefixdir)
363             if not os.path.exists(basedir):
364                 continue
365             for f in os.listdir(basedir):
366                 try:
367                     shnum = int(f)
368                     shares.append((shnum, serverid, os.path.join(basedir, f)))
369                 except ValueError:
370                     pass
371         return sorted(shares)
372
373     def copy_shares(self, uri):
374         shares = {}
375         for (shnum, serverid, sharefile) in self.find_uri_shares(uri):
376             shares[sharefile] = open(sharefile, "rb").read()
377         return shares
378
379     def restore_all_shares(self, shares):
380         for sharefile, data in shares.items():
381             open(sharefile, "wb").write(data)
382
383     def delete_share(self, (shnum, serverid, sharefile)):
384         os.unlink(sharefile)
385
386     def delete_shares_numbered(self, uri, shnums):
387         for (i_shnum, i_serverid, i_sharefile) in self.find_uri_shares(uri):
388             if i_shnum in shnums:
389                 os.unlink(i_sharefile)
390
391     def corrupt_share(self, (shnum, serverid, sharefile), corruptor_function):
392         sharedata = open(sharefile, "rb").read()
393         corruptdata = corruptor_function(sharedata)
394         open(sharefile, "wb").write(corruptdata)
395
396     def corrupt_shares_numbered(self, uri, shnums, corruptor, debug=False):
397         for (i_shnum, i_serverid, i_sharefile) in self.find_uri_shares(uri):
398             if i_shnum in shnums:
399                 sharedata = open(i_sharefile, "rb").read()
400                 corruptdata = corruptor(sharedata, debug=debug)
401                 open(i_sharefile, "wb").write(corruptdata)
402
403     def corrupt_all_shares(self, uri, corruptor, debug=False):
404         for (i_shnum, i_serverid, i_sharefile) in self.find_uri_shares(uri):
405             sharedata = open(i_sharefile, "rb").read()
406             corruptdata = corruptor(sharedata, debug=debug)
407             open(i_sharefile, "wb").write(corruptdata)
408
409     def GET(self, urlpath, followRedirect=False, return_response=False,
410             method="GET", clientnum=0, **kwargs):
411         # if return_response=True, this fires with (data, statuscode,
412         # respheaders) instead of just data.
413         assert not isinstance(urlpath, unicode)
414         url = self.client_baseurls[clientnum] + urlpath
415         factory = HTTPClientGETFactory(url, method=method,
416                                        followRedirect=followRedirect, **kwargs)
417         reactor.connectTCP("localhost", self.client_webports[clientnum],factory)
418         d = factory.deferred
419         def _got_data(data):
420             return (data, factory.status, factory.response_headers)
421         if return_response:
422             d.addCallback(_got_data)
423         return factory.deferred
424
425     def PUT(self, urlpath, **kwargs):
426         return self.GET(urlpath, method="PUT", **kwargs)