]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/no_network.py
start to factor server-connection-management into a distinct 'StorageServerFarmBroker...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / no_network.py
1
2 # This contains a test harness that creates a full Tahoe grid in a single
3 # process (actually in a single MultiService) which does not use the network.
4 # It does not use an Introducer, and there are no foolscap Tubs. Each storage
5 # server puts real shares on disk, but is accessed through loopback
6 # RemoteReferences instead of over serialized SSL. It is not as complete as
7 # the common.SystemTestMixin framework (which does use the network), but
8 # should be considerably faster: on my laptop, it takes 50-80ms to start up,
9 # whereas SystemTestMixin takes close to 2s.
10
11 # This should be useful for tests which want to examine and/or manipulate the
12 # uploaded shares, checker/verifier/repairer tests, etc. The clients have no
13 # Tubs, so it is not useful for tests that involve a Helper, a KeyGenerator,
14 # or the control.furl .
15
16 import os.path
17 import sha
18 from twisted.application import service
19 from twisted.internet import reactor
20 from twisted.python.failure import Failure
21 from foolscap.api import Referenceable, fireEventually, RemoteException
22 from base64 import b32encode
23 from allmydata import uri as tahoe_uri
24 from allmydata.client import Client
25 from allmydata.storage.server import StorageServer, storage_index_to_dir
26 from allmydata.util import fileutil, idlib, hashutil
27 from allmydata.introducer.client import RemoteServiceConnector
28 from allmydata.test.common_web import HTTPClientGETFactory
29
30 class IntentionalError(Exception):
31     pass
32
33 class Marker:
34     pass
35
36 class LocalWrapper:
37     def __init__(self, original):
38         self.original = original
39         self.broken = False
40         self.post_call_notifier = None
41         self.disconnectors = {}
42
43     def callRemoteOnly(self, methname, *args, **kwargs):
44         d = self.callRemote(methname, *args, **kwargs)
45         return None
46
47     def callRemote(self, methname, *args, **kwargs):
48         # this is ideally a Membrane, but that's too hard. We do a shallow
49         # wrapping of inbound arguments, and per-methodname wrapping of
50         # selected return values.
51         def wrap(a):
52             if isinstance(a, Referenceable):
53                 return LocalWrapper(a)
54             else:
55                 return a
56         args = tuple([wrap(a) for a in args])
57         kwargs = dict([(k,wrap(kwargs[k])) for k in kwargs])
58         def _call():
59             if self.broken:
60                 raise IntentionalError("I was asked to break")
61             meth = getattr(self.original, "remote_" + methname)
62             return meth(*args, **kwargs)
63         d = fireEventually()
64         d.addCallback(lambda res: _call())
65         def _wrap_exception(f):
66             return Failure(RemoteException(f))
67         d.addErrback(_wrap_exception)
68         def _return_membrane(res):
69             # rather than complete the difficult task of building a
70             # fully-general Membrane (which would locate all Referenceable
71             # objects that cross the simulated wire and replace them with
72             # wrappers), we special-case certain methods that we happen to
73             # know will return Referenceables.
74             if methname == "allocate_buckets":
75                 (alreadygot, allocated) = res
76                 for shnum in allocated:
77                     allocated[shnum] = LocalWrapper(allocated[shnum])
78             if methname == "get_buckets":
79                 for shnum in res:
80                     res[shnum] = LocalWrapper(res[shnum])
81             return res
82         d.addCallback(_return_membrane)
83         if self.post_call_notifier:
84             d.addCallback(self.post_call_notifier, methname)
85         return d
86
87     def notifyOnDisconnect(self, f, *args, **kwargs):
88         m = Marker()
89         self.disconnectors[m] = (f, args, kwargs)
90         return m
91     def dontNotifyOnDisconnect(self, marker):
92         del self.disconnectors[marker]
93
94 def wrap(original, service_name):
95     # Much of the upload/download code uses rref.version (which normally
96     # comes from rrefutil.add_version_to_remote_reference). To avoid using a
97     # network, we want a LocalWrapper here. Try to satisfy all these
98     # constraints at the same time.
99     wrapper = LocalWrapper(original)
100     try:
101         version = original.remote_get_version()
102     except AttributeError:
103         version = RemoteServiceConnector.VERSION_DEFAULTS[service_name]
104     wrapper.version = version
105     return wrapper
106
107 class NoNetworkStorageBroker:
108     def get_servers(self, key):
109         return sorted(self.client._servers,
110                       key=lambda x: sha.new(key+x[0]).digest())
111
112 class NoNetworkClient(Client):
113
114     def create_tub(self):
115         pass
116     def init_introducer_client(self):
117         pass
118     def setup_logging(self):
119         pass
120     def startService(self):
121         service.MultiService.startService(self)
122     def stopService(self):
123         service.MultiService.stopService(self)
124     def when_tub_ready(self):
125         raise NotImplementedError("NoNetworkClient has no Tub")
126     def init_control(self):
127         pass
128     def init_helper(self):
129         pass
130     def init_key_gen(self):
131         pass
132     def init_storage(self):
133         pass
134     def init_client_storage_broker(self):
135         self.storage_broker = NoNetworkStorageBroker()
136         self.storage_broker.client = self
137     def init_stub_client(self):
138         pass
139
140     def get_servers(self, service_name):
141         return self._servers
142
143     def get_nickname_for_serverid(self, serverid):
144         return None
145
146 class SimpleStats:
147     def __init__(self):
148         self.counters = {}
149         self.stats_producers = []
150
151     def count(self, name, delta=1):
152         val = self.counters.setdefault(name, 0)
153         self.counters[name] = val + delta
154
155     def register_producer(self, stats_producer):
156         self.stats_producers.append(stats_producer)
157
158     def get_stats(self):
159         stats = {}
160         for sp in self.stats_producers:
161             stats.update(sp.get_stats())
162         ret = { 'counters': self.counters, 'stats': stats }
163         return ret
164
165 class NoNetworkGrid(service.MultiService):
166     def __init__(self, basedir, num_clients=1, num_servers=10,
167                  client_config_hooks={}):
168         service.MultiService.__init__(self)
169         self.basedir = basedir
170         fileutil.make_dirs(basedir)
171
172         self.servers_by_number = {}
173         self.servers_by_id = {}
174         self.clients = []
175
176         for i in range(num_servers):
177             ss = self.make_server(i)
178             self.add_server(i, ss)
179
180         for i in range(num_clients):
181             clientid = hashutil.tagged_hash("clientid", str(i))[:20]
182             clientdir = os.path.join(basedir, "clients",
183                                      idlib.shortnodeid_b2a(clientid))
184             fileutil.make_dirs(clientdir)
185             f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
186             f.write("[node]\n")
187             f.write("nickname = client-%d\n" % i)
188             f.write("web.port = tcp:0:interface=127.0.0.1\n")
189             f.write("[storage]\n")
190             f.write("enabled = false\n")
191             f.close()
192             c = None
193             if i in client_config_hooks:
194                 # this hook can either modify tahoe.cfg, or return an
195                 # entirely new Client instance
196                 c = client_config_hooks[i](clientdir)
197             if not c:
198                 c = NoNetworkClient(clientdir)
199             c.nodeid = clientid
200             c.short_nodeid = b32encode(clientid).lower()[:8]
201             c._servers = self.all_servers # can be updated later
202             c.setServiceParent(self)
203             self.clients.append(c)
204
205     def make_server(self, i):
206         serverid = hashutil.tagged_hash("serverid", str(i))[:20]
207         serverdir = os.path.join(self.basedir, "servers",
208                                  idlib.shortnodeid_b2a(serverid))
209         fileutil.make_dirs(serverdir)
210         ss = StorageServer(serverdir, serverid, stats_provider=SimpleStats())
211         return ss
212
213     def add_server(self, i, ss):
214         # to deal with the fact that all StorageServers are named 'storage',
215         # we interpose a middleman
216         middleman = service.MultiService()
217         middleman.setServiceParent(self)
218         ss.setServiceParent(middleman)
219         serverid = ss.my_nodeid
220         self.servers_by_number[i] = ss
221         self.servers_by_id[serverid] = wrap(ss, "storage")
222         self.all_servers = frozenset(self.servers_by_id.items())
223         for c in self.clients:
224             c._servers = self.all_servers
225
226 class GridTestMixin:
227     def setUp(self):
228         self.s = service.MultiService()
229         self.s.startService()
230
231     def tearDown(self):
232         return self.s.stopService()
233
234     def set_up_grid(self, num_clients=1, num_servers=10,
235                     client_config_hooks={}):
236         # self.basedir must be set
237         self.g = NoNetworkGrid(self.basedir,
238                                num_clients=num_clients,
239                                num_servers=num_servers,
240                                client_config_hooks=client_config_hooks)
241         self.g.setServiceParent(self.s)
242         self.client_webports = [c.getServiceNamed("webish").listener._port.getHost().port
243                                 for c in self.g.clients]
244         self.client_baseurls = ["http://localhost:%d/" % p
245                                 for p in self.client_webports]
246
247     def get_clientdir(self, i=0):
248         return self.g.clients[i].basedir
249
250     def get_serverdir(self, i):
251         return self.g.servers_by_number[i].storedir
252
253     def iterate_servers(self):
254         for i in sorted(self.g.servers_by_number.keys()):
255             ss = self.g.servers_by_number[i]
256             yield (i, ss, ss.storedir)
257
258     def find_shares(self, uri):
259         si = tahoe_uri.from_string(uri).get_storage_index()
260         prefixdir = storage_index_to_dir(si)
261         shares = []
262         for i,ss in self.g.servers_by_number.items():
263             serverid = ss.my_nodeid
264             basedir = os.path.join(ss.storedir, "shares", prefixdir)
265             if not os.path.exists(basedir):
266                 continue
267             for f in os.listdir(basedir):
268                 try:
269                     shnum = int(f)
270                     shares.append((shnum, serverid, os.path.join(basedir, f)))
271                 except ValueError:
272                     pass
273         return sorted(shares)
274
275     def delete_share(self, (shnum, serverid, sharefile)):
276         os.unlink(sharefile)
277
278     def delete_shares_numbered(self, uri, shnums):
279         for (i_shnum, i_serverid, i_sharefile) in self.find_shares(uri):
280             if i_shnum in shnums:
281                 os.unlink(i_sharefile)
282
283     def corrupt_share(self, (shnum, serverid, sharefile), corruptor_function):
284         sharedata = open(sharefile, "rb").read()
285         corruptdata = corruptor_function(sharedata)
286         open(sharefile, "wb").write(corruptdata)
287
288     def corrupt_shares_numbered(self, uri, shnums, corruptor):
289         for (i_shnum, i_serverid, i_sharefile) in self.find_shares(uri):
290             if i_shnum in shnums:
291                 sharedata = open(i_sharefile, "rb").read()
292                 corruptdata = corruptor(sharedata)
293                 open(i_sharefile, "wb").write(corruptdata)
294
295     def GET(self, urlpath, followRedirect=False, return_response=False,
296             method="GET", clientnum=0, **kwargs):
297         # if return_response=True, this fires with (data, statuscode,
298         # respheaders) instead of just data.
299         assert not isinstance(urlpath, unicode)
300         url = self.client_baseurls[clientnum] + urlpath
301         factory = HTTPClientGETFactory(url, method=method,
302                                        followRedirect=followRedirect, **kwargs)
303         reactor.connectTCP("localhost", self.client_webports[clientnum],factory)
304         d = factory.deferred
305         def _got_data(data):
306             return (data, factory.status, factory.response_headers)
307         if return_response:
308             d.addCallback(_got_data)
309         return factory.deferred