]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/no_network.py
big rework of introducer client: change local API, split division of responsibilites...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / no_network.py
1
2 # This contains a test harness that creates a full Tahoe grid in a single
3 # process (actually in a single MultiService) which does not use the network.
4 # It does not use an Introducer, and there are no foolscap Tubs. Each storage
5 # server puts real shares on disk, but is accessed through loopback
6 # RemoteReferences instead of over serialized SSL. It is not as complete as
7 # the common.SystemTestMixin framework (which does use the network), but
8 # should be considerably faster: on my laptop, it takes 50-80ms to start up,
9 # whereas SystemTestMixin takes close to 2s.
10
11 # This should be useful for tests which want to examine and/or manipulate the
12 # uploaded shares, checker/verifier/repairer tests, etc. The clients have no
13 # Tubs, so it is not useful for tests that involve a Helper, a KeyGenerator,
14 # or the control.furl .
15
16 import os.path
17 import sha
18 from zope.interface import implements
19 from twisted.application import service
20 from twisted.internet import reactor
21 from twisted.python.failure import Failure
22 from foolscap.api import Referenceable, fireEventually, RemoteException
23 from base64 import b32encode
24 from allmydata import uri as tahoe_uri
25 from allmydata.client import Client
26 from allmydata.storage.server import StorageServer, storage_index_to_dir
27 from allmydata.util import fileutil, idlib, hashutil
28 from allmydata.test.common_web import HTTPClientGETFactory
29 from allmydata.interfaces import IStorageBroker
30
31 class IntentionalError(Exception):
32     pass
33
34 class Marker:
35     pass
36
37 class LocalWrapper:
38     def __init__(self, original):
39         self.original = original
40         self.broken = False
41         self.post_call_notifier = None
42         self.disconnectors = {}
43
44     def callRemoteOnly(self, methname, *args, **kwargs):
45         d = self.callRemote(methname, *args, **kwargs)
46         return None
47
48     def callRemote(self, methname, *args, **kwargs):
49         # this is ideally a Membrane, but that's too hard. We do a shallow
50         # wrapping of inbound arguments, and per-methodname wrapping of
51         # selected return values.
52         def wrap(a):
53             if isinstance(a, Referenceable):
54                 return LocalWrapper(a)
55             else:
56                 return a
57         args = tuple([wrap(a) for a in args])
58         kwargs = dict([(k,wrap(kwargs[k])) for k in kwargs])
59         def _call():
60             if self.broken:
61                 raise IntentionalError("I was asked to break")
62             meth = getattr(self.original, "remote_" + methname)
63             return meth(*args, **kwargs)
64         d = fireEventually()
65         d.addCallback(lambda res: _call())
66         def _wrap_exception(f):
67             return Failure(RemoteException(f))
68         d.addErrback(_wrap_exception)
69         def _return_membrane(res):
70             # rather than complete the difficult task of building a
71             # fully-general Membrane (which would locate all Referenceable
72             # objects that cross the simulated wire and replace them with
73             # wrappers), we special-case certain methods that we happen to
74             # know will return Referenceables.
75             if methname == "allocate_buckets":
76                 (alreadygot, allocated) = res
77                 for shnum in allocated:
78                     allocated[shnum] = LocalWrapper(allocated[shnum])
79             if methname == "get_buckets":
80                 for shnum in res:
81                     res[shnum] = LocalWrapper(res[shnum])
82             return res
83         d.addCallback(_return_membrane)
84         if self.post_call_notifier:
85             d.addCallback(self.post_call_notifier, methname)
86         return d
87
88     def notifyOnDisconnect(self, f, *args, **kwargs):
89         m = Marker()
90         self.disconnectors[m] = (f, args, kwargs)
91         return m
92     def dontNotifyOnDisconnect(self, marker):
93         del self.disconnectors[marker]
94
95 def wrap_storage_server(original):
96     # Much of the upload/download code uses rref.version (which normally
97     # comes from rrefutil.add_version_to_remote_reference). To avoid using a
98     # network, we want a LocalWrapper here. Try to satisfy all these
99     # constraints at the same time.
100     wrapper = LocalWrapper(original)
101     wrapper.version = original.remote_get_version()
102     return wrapper
103
104 class NoNetworkStorageBroker:
105     implements(IStorageBroker)
106     def get_servers_for_index(self, key):
107         return sorted(self.client._servers,
108                       key=lambda x: sha.new(key+x[0]).digest())
109     def get_all_servers(self):
110         return frozenset(self.client._servers)
111     def get_nickname_for_serverid(self, serverid):
112         return None
113
114 class NoNetworkClient(Client):
115
116     def create_tub(self):
117         pass
118     def init_introducer_client(self):
119         pass
120     def setup_logging(self):
121         pass
122     def startService(self):
123         service.MultiService.startService(self)
124     def stopService(self):
125         service.MultiService.stopService(self)
126     def when_tub_ready(self):
127         raise NotImplementedError("NoNetworkClient has no Tub")
128     def init_control(self):
129         pass
130     def init_helper(self):
131         pass
132     def init_key_gen(self):
133         pass
134     def init_storage(self):
135         pass
136     def init_client_storage_broker(self):
137         self.storage_broker = NoNetworkStorageBroker()
138         self.storage_broker.client = self
139     def init_stub_client(self):
140         pass
141     #._servers will be set by the NoNetworkGrid which creates us
142
143 class SimpleStats:
144     def __init__(self):
145         self.counters = {}
146         self.stats_producers = []
147
148     def count(self, name, delta=1):
149         val = self.counters.setdefault(name, 0)
150         self.counters[name] = val + delta
151
152     def register_producer(self, stats_producer):
153         self.stats_producers.append(stats_producer)
154
155     def get_stats(self):
156         stats = {}
157         for sp in self.stats_producers:
158             stats.update(sp.get_stats())
159         ret = { 'counters': self.counters, 'stats': stats }
160         return ret
161
162 class NoNetworkGrid(service.MultiService):
163     def __init__(self, basedir, num_clients=1, num_servers=10,
164                  client_config_hooks={}):
165         service.MultiService.__init__(self)
166         self.basedir = basedir
167         fileutil.make_dirs(basedir)
168
169         self.servers_by_number = {}
170         self.servers_by_id = {}
171         self.clients = []
172
173         for i in range(num_servers):
174             ss = self.make_server(i)
175             self.add_server(i, ss)
176
177         for i in range(num_clients):
178             clientid = hashutil.tagged_hash("clientid", str(i))[:20]
179             clientdir = os.path.join(basedir, "clients",
180                                      idlib.shortnodeid_b2a(clientid))
181             fileutil.make_dirs(clientdir)
182             f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
183             f.write("[node]\n")
184             f.write("nickname = client-%d\n" % i)
185             f.write("web.port = tcp:0:interface=127.0.0.1\n")
186             f.write("[storage]\n")
187             f.write("enabled = false\n")
188             f.close()
189             c = None
190             if i in client_config_hooks:
191                 # this hook can either modify tahoe.cfg, or return an
192                 # entirely new Client instance
193                 c = client_config_hooks[i](clientdir)
194             if not c:
195                 c = NoNetworkClient(clientdir)
196             c.nodeid = clientid
197             c.short_nodeid = b32encode(clientid).lower()[:8]
198             c._servers = self.all_servers # can be updated later
199             c.setServiceParent(self)
200             self.clients.append(c)
201
202     def make_server(self, i):
203         serverid = hashutil.tagged_hash("serverid", str(i))[:20]
204         serverdir = os.path.join(self.basedir, "servers",
205                                  idlib.shortnodeid_b2a(serverid))
206         fileutil.make_dirs(serverdir)
207         ss = StorageServer(serverdir, serverid, stats_provider=SimpleStats())
208         return ss
209
210     def add_server(self, i, ss):
211         # to deal with the fact that all StorageServers are named 'storage',
212         # we interpose a middleman
213         middleman = service.MultiService()
214         middleman.setServiceParent(self)
215         ss.setServiceParent(middleman)
216         serverid = ss.my_nodeid
217         self.servers_by_number[i] = ss
218         self.servers_by_id[serverid] = wrap_storage_server(ss)
219         self.all_servers = frozenset(self.servers_by_id.items())
220         for c in self.clients:
221             c._servers = self.all_servers
222
223 class GridTestMixin:
224     def setUp(self):
225         self.s = service.MultiService()
226         self.s.startService()
227
228     def tearDown(self):
229         return self.s.stopService()
230
231     def set_up_grid(self, num_clients=1, num_servers=10,
232                     client_config_hooks={}):
233         # self.basedir must be set
234         self.g = NoNetworkGrid(self.basedir,
235                                num_clients=num_clients,
236                                num_servers=num_servers,
237                                client_config_hooks=client_config_hooks)
238         self.g.setServiceParent(self.s)
239         self.client_webports = [c.getServiceNamed("webish").listener._port.getHost().port
240                                 for c in self.g.clients]
241         self.client_baseurls = ["http://localhost:%d/" % p
242                                 for p in self.client_webports]
243
244     def get_clientdir(self, i=0):
245         return self.g.clients[i].basedir
246
247     def get_serverdir(self, i):
248         return self.g.servers_by_number[i].storedir
249
250     def iterate_servers(self):
251         for i in sorted(self.g.servers_by_number.keys()):
252             ss = self.g.servers_by_number[i]
253             yield (i, ss, ss.storedir)
254
255     def find_shares(self, uri):
256         si = tahoe_uri.from_string(uri).get_storage_index()
257         prefixdir = storage_index_to_dir(si)
258         shares = []
259         for i,ss in self.g.servers_by_number.items():
260             serverid = ss.my_nodeid
261             basedir = os.path.join(ss.storedir, "shares", prefixdir)
262             if not os.path.exists(basedir):
263                 continue
264             for f in os.listdir(basedir):
265                 try:
266                     shnum = int(f)
267                     shares.append((shnum, serverid, os.path.join(basedir, f)))
268                 except ValueError:
269                     pass
270         return sorted(shares)
271
272     def delete_share(self, (shnum, serverid, sharefile)):
273         os.unlink(sharefile)
274
275     def delete_shares_numbered(self, uri, shnums):
276         for (i_shnum, i_serverid, i_sharefile) in self.find_shares(uri):
277             if i_shnum in shnums:
278                 os.unlink(i_sharefile)
279
280     def corrupt_share(self, (shnum, serverid, sharefile), corruptor_function):
281         sharedata = open(sharefile, "rb").read()
282         corruptdata = corruptor_function(sharedata)
283         open(sharefile, "wb").write(corruptdata)
284
285     def corrupt_shares_numbered(self, uri, shnums, corruptor):
286         for (i_shnum, i_serverid, i_sharefile) in self.find_shares(uri):
287             if i_shnum in shnums:
288                 sharedata = open(i_sharefile, "rb").read()
289                 corruptdata = corruptor(sharedata)
290                 open(i_sharefile, "wb").write(corruptdata)
291
292     def GET(self, urlpath, followRedirect=False, return_response=False,
293             method="GET", clientnum=0, **kwargs):
294         # if return_response=True, this fires with (data, statuscode,
295         # respheaders) instead of just data.
296         assert not isinstance(urlpath, unicode)
297         url = self.client_baseurls[clientnum] + urlpath
298         factory = HTTPClientGETFactory(url, method=method,
299                                        followRedirect=followRedirect, **kwargs)
300         reactor.connectTCP("localhost", self.client_webports[clientnum],factory)
301         d = factory.deferred
302         def _got_data(data):
303             return (data, factory.status, factory.response_headers)
304         if return_response:
305             d.addCallback(_got_data)
306         return factory.deferred