]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/no_network.py
break storage.py into smaller pieces in storage/*.py . No behavioral changes.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / no_network.py
1
2 # This contains a test harness that creates a full Tahoe grid in a single
3 # process (actually in a single MultiService) which does not use the network.
4 # It does not use an Introducer, and there are no foolscap Tubs. Each storage
5 # server puts real shares on disk, but is accessed through loopback
6 # RemoteReferences instead of over serialized SSL. It is not as complete as
7 # the common.SystemTestMixin framework (which does use the network), but
8 # should be considerably faster: on my laptop, it takes 50-80ms to start up,
9 # whereas SystemTestMixin takes close to 2s.
10
11 # This should be useful for tests which want to examine and/or manipulate the
12 # uploaded shares, checker/verifier/repairer tests, etc. The clients have no
13 # Tubs, so it is not useful for tests that involve a Helper, a KeyGenerator,
14 # or the control.furl .
15
16 import os.path
17 import sha
18 from twisted.application import service
19 from foolscap import Referenceable
20 from foolscap.eventual import fireEventually
21 from base64 import b32encode
22 from allmydata import uri as tahoe_uri
23 from allmydata.client import Client
24 from allmydata.storage.server import StorageServer, storage_index_to_dir
25 from allmydata.util import fileutil, idlib, hashutil, rrefutil
26 from allmydata.introducer.client import RemoteServiceConnector
27
28 class IntentionalError(Exception):
29     pass
30
31 class Marker:
32     pass
33
34 class LocalWrapper:
35     def __init__(self, original):
36         self.original = original
37         self.broken = False
38         self.post_call_notifier = None
39         self.disconnectors = {}
40
41     def callRemoteOnly(self, methname, *args, **kwargs):
42         d = self.callRemote(methname, *args, **kwargs)
43         return None
44
45     def callRemote(self, methname, *args, **kwargs):
46         # this is ideally a Membrane, but that's too hard. We do a shallow
47         # wrapping of inbound arguments, and per-methodname wrapping of
48         # selected return values.
49         def wrap(a):
50             if isinstance(a, Referenceable):
51                 return LocalWrapper(a)
52             else:
53                 return a
54         args = tuple([wrap(a) for a in args])
55         kwargs = dict([(k,wrap(kwargs[k])) for k in kwargs])
56         def _call():
57             if self.broken:
58                 raise IntentionalError("I was asked to break")
59             meth = getattr(self.original, "remote_" + methname)
60             return meth(*args, **kwargs)
61         d = fireEventually()
62         d.addCallback(lambda res: _call())
63         def _return_membrane(res):
64             # rather than complete the difficult task of building a
65             # fully-general Membrane (which would locate all Referenceable
66             # objects that cross the simulated wire and replace them with
67             # wrappers), we special-case certain methods that we happen to
68             # know will return Referenceables.
69             if methname == "allocate_buckets":
70                 (alreadygot, allocated) = res
71                 for shnum in allocated:
72                     allocated[shnum] = LocalWrapper(allocated[shnum])
73             if methname == "get_buckets":
74                 for shnum in res:
75                     res[shnum] = LocalWrapper(res[shnum])
76             return res
77         d.addCallback(_return_membrane)
78         if self.post_call_notifier:
79             d.addCallback(self.post_call_notifier, methname)
80         return d
81
82     def notifyOnDisconnect(self, f, *args, **kwargs):
83         m = Marker()
84         self.disconnectors[m] = (f, args, kwargs)
85         return m
86     def dontNotifyOnDisconnect(self, marker):
87         del self.disconnectors[marker]
88
89 def wrap(original, service_name):
90     # The code in immutable.checker insists upon asserting the truth of
91     # isinstance(rref, rrefutil.WrappedRemoteReference). Much of the
92     # upload/download code uses rref.version (which normally comes from
93     # rrefutil.VersionedRemoteReference). To avoid using a network, we want a
94     # LocalWrapper here. Try to satisfy all these constraints at the same
95     # time.
96     local = LocalWrapper(original)
97     wrapped = rrefutil.WrappedRemoteReference(local)
98     try:
99         version = original.remote_get_version()
100     except AttributeError:
101         version = RemoteServiceConnector.VERSION_DEFAULTS[service_name]
102     wrapped.version = version
103     return wrapped
104
105 class NoNetworkClient(Client):
106
107     def create_tub(self):
108         pass
109     def init_introducer_client(self):
110         pass
111     def setup_logging(self):
112         pass
113     def startService(self):
114         service.MultiService.startService(self)
115     def stopService(self):
116         service.MultiService.stopService(self)
117     def when_tub_ready(self):
118         raise RuntimeError("NoNetworkClient has no Tub")
119     def init_control(self):
120         pass
121     def init_helper(self):
122         pass
123     def init_key_gen(self):
124         pass
125     def init_storage(self):
126         pass
127     def init_stub_client(self):
128         pass
129
130     def get_servers(self, service_name):
131         return self._servers
132
133     def get_permuted_peers(self, service_name, key):
134         return sorted(self._servers, key=lambda x: sha.new(key+x[0]).digest())
135     def get_nickname_for_peerid(self, peerid):
136         return None
137
138
139 class NoNetworkGrid(service.MultiService):
140     def __init__(self, basedir, num_clients=1, num_servers=10,
141                  client_config_hooks={}):
142         service.MultiService.__init__(self)
143         self.basedir = basedir
144         fileutil.make_dirs(basedir)
145
146         self.servers_by_number = {}
147         self.servers_by_id = {}
148         self.clients = []
149
150         for i in range(num_servers):
151             serverid = hashutil.tagged_hash("serverid", str(i))[:20]
152             serverdir = os.path.join(basedir, "servers",
153                                      idlib.shortnodeid_b2a(serverid))
154             fileutil.make_dirs(serverdir)
155             ss = StorageServer(serverdir)
156             self.add_server(i, serverid, ss)
157
158         for i in range(num_clients):
159             clientid = hashutil.tagged_hash("clientid", str(i))[:20]
160             clientdir = os.path.join(basedir, "clients",
161                                      idlib.shortnodeid_b2a(clientid))
162             fileutil.make_dirs(clientdir)
163             f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
164             f.write("[node]\n")
165             f.write("nickname = client-%d\n" % i)
166             f.write("web.port = tcp:0:interface=127.0.0.1\n")
167             f.write("[storage]\n")
168             f.write("enabled = false\n")
169             f.close()
170             c = None
171             if i in client_config_hooks:
172                 # this hook can either modify tahoe.cfg, or return an
173                 # entirely new Client instance
174                 c = client_config_hooks[i](clientdir)
175             if not c:
176                 c = NoNetworkClient(clientdir)
177             c.nodeid = clientid
178             c.short_nodeid = b32encode(clientid).lower()[:8]
179             c._servers = self.all_servers # can be updated later
180             c.setServiceParent(self)
181             self.clients.append(c)
182
183     def add_server(self, i, serverid, ss):
184         # TODO: ss.setServiceParent(self), but first remove the goofy
185         # self.parent.nodeid from Storage.startService . At the moment,
186         # Storage doesn't really need to be startService'd, but it will in
187         # the future.
188         ss.setNodeID(serverid)
189         self.servers_by_number[i] = ss
190         self.servers_by_id[serverid] = wrap(ss, "storage")
191         self.all_servers = frozenset(self.servers_by_id.items())
192         for c in self.clients:
193             c._servers = self.all_servers
194
195 class GridTestMixin:
196     def setUp(self):
197         self.s = service.MultiService()
198         self.s.startService()
199
200     def tearDown(self):
201         return self.s.stopService()
202
203     def set_up_grid(self, num_clients=1, client_config_hooks={}):
204         # self.basedir must be set
205         self.g = NoNetworkGrid(self.basedir, num_clients=num_clients,
206                                client_config_hooks=client_config_hooks)
207         self.g.setServiceParent(self.s)
208         self.client_webports = [c.getServiceNamed("webish").listener._port.getHost().port
209                                 for c in self.g.clients]
210         self.client_baseurls = ["http://localhost:%d/" % p
211                                 for p in self.client_webports]
212
213     def get_clientdir(self, i=0):
214         return self.g.clients[i].basedir
215
216     def get_serverdir(self, i):
217         return self.g.servers_by_number[i].storedir
218
219     def iterate_servers(self):
220         for i in sorted(self.g.servers_by_number.keys()):
221             ss = self.g.servers_by_number[i]
222             yield (i, ss, ss.storedir)
223
224     def find_shares(self, uri):
225         si = tahoe_uri.from_string(uri).get_storage_index()
226         prefixdir = storage_index_to_dir(si)
227         shares = []
228         for i,ss in self.g.servers_by_number.items():
229             serverid = ss.my_nodeid
230             basedir = os.path.join(ss.storedir, "shares", prefixdir)
231             if not os.path.exists(basedir):
232                 continue
233             for f in os.listdir(basedir):
234                 try:
235                     shnum = int(f)
236                     shares.append((shnum, serverid, os.path.join(basedir, f)))
237                 except ValueError:
238                     pass
239         return sorted(shares)
240