2 # This contains a test harness that creates a full Tahoe grid in a single
3 # process (actually in a single MultiService) which does not use the network.
4 # It does not use an Introducer, and there are no foolscap Tubs. Each storage
5 # server puts real shares on disk, but is accessed through loopback
6 # RemoteReferences instead of over serialized SSL. It is not as complete as
7 # the common.SystemTestMixin framework (which does use the network), but
8 # should be considerably faster. It should be useful for tests which want to
9 # examine and/or manipulate the uploaded shares, checker/verifier/repairer
10 # tests, etc. The clients have no Tubs, so it is not useful for tests that
11 # involve a Helper, a KeyGenerator, or the control.furl .
15 from twisted.application import service
16 from foolscap import Referenceable
17 from foolscap.eventual import fireEventually
18 from base64 import b32encode
19 from allmydata.client import Client
20 from allmydata.storage import StorageServer
21 from allmydata.util import fileutil, idlib, hashutil, rrefutil
22 from allmydata.introducer.client import RemoteServiceConnector
24 class IntentionalError(Exception):
31 def __init__(self, original):
32 self.original = original
34 self.post_call_notifier = None
35 self.disconnectors = {}
37 def callRemoteOnly(self, methname, *args, **kwargs):
38 d = self.callRemote(methname, *args, **kwargs)
41 def callRemote(self, methname, *args, **kwargs):
42 # this is ideally a Membrane, but that's too hard. We do a shallow
43 # wrapping of inbound arguments, and per-methodname wrapping of
44 # selected return values.
46 if isinstance(a, Referenceable):
47 return LocalWrapper(a)
50 args = tuple([wrap(a) for a in args])
51 kwargs = dict([(k,wrap(kwargs[k])) for k in kwargs])
54 raise IntentionalError("I was asked to break")
55 meth = getattr(self.original, "remote_" + methname)
56 return meth(*args, **kwargs)
58 d.addCallback(lambda res: _call())
59 def _return_membrane(res):
60 # rather than complete the difficult task of building a
61 # fully-general Membrane (which would locate all Referenceable
62 # objects that cross the simulated wire and replace them with
63 # wrappers), we special-case certain methods that we happen to
64 # know will return Referenceables.
65 if methname == "allocate_buckets":
66 (alreadygot, allocated) = res
67 for shnum in allocated:
68 allocated[shnum] = LocalWrapper(allocated[shnum])
69 if methname == "get_buckets":
71 res[shnum] = LocalWrapper(res[shnum])
73 d.addCallback(_return_membrane)
74 if self.post_call_notifier:
75 d.addCallback(self.post_call_notifier, methname)
78 def notifyOnDisconnect(self, f, *args, **kwargs):
80 self.disconnectors[m] = (f, args, kwargs)
82 def dontNotifyOnDisconnect(self, marker):
83 del self.disconnectors[marker]
85 def wrap(original, service_name):
86 # The code in immutable.checker insists upon asserting the truth of
87 # isinstance(rref, rrefutil.WrappedRemoteReference). Much of the
88 # upload/download code uses rref.version (which normally comes from
89 # rrefutil.VersionedRemoteReference). To avoid using a network, we want a
90 # LocalWrapper here. Try to satisfy all these constraints at the same
92 local = LocalWrapper(original)
93 wrapped = rrefutil.WrappedRemoteReference(local)
95 version = original.remote_get_version()
96 except AttributeError:
97 version = RemoteServiceConnector.VERSION_DEFAULTS[service_name]
98 wrapped.version = version
101 class NoNetworkClient(Client):
103 def create_tub(self):
105 def init_introducer_client(self):
107 def setup_logging(self):
109 def startService(self):
110 service.MultiService.startService(self)
111 def stopService(self):
112 service.MultiService.stopService(self)
113 def when_tub_ready(self):
114 raise RuntimeError("NoNetworkClient has no Tub")
115 def init_control(self):
117 def init_helper(self):
119 def init_key_gen(self):
121 def init_storage(self):
123 def init_stub_client(self):
126 def get_servers(self, service_name):
129 def get_permuted_peers(self, service_name, key):
130 return sorted(self._servers, key=lambda x: sha.new(key+x[0]).digest())
133 class NoNetworkGrid(service.MultiService):
134 def __init__(self, basedir, num_clients=1, num_servers=10):
135 service.MultiService.__init__(self)
136 self.basedir = basedir
137 fileutil.make_dirs(basedir)
139 self.servers_by_number = {}
140 self.servers_by_id = {}
143 for i in range(num_servers):
144 serverid = hashutil.tagged_hash("serverid", str(i))[:20]
145 serverdir = os.path.join(basedir, "servers",
146 idlib.shortnodeid_b2a(serverid))
147 fileutil.make_dirs(serverdir)
148 ss = StorageServer(serverdir)
149 self.add_server(i, serverid, ss)
151 for i in range(num_clients):
152 clientid = hashutil.tagged_hash("clientid", str(i))[:20]
153 clientdir = os.path.join(basedir, "clients",
154 idlib.shortnodeid_b2a(clientid))
155 fileutil.make_dirs(clientdir)
156 f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
158 f.write("nickname = client-%d\n" % i)
159 f.write("web.port = tcp:0:interface=127.0.0.1\n")
160 f.write("[storage]\n")
161 f.write("enabled = false\n")
163 c = NoNetworkClient(clientdir)
165 c.short_nodeid = b32encode(clientid).lower()[:8]
166 c._servers = self.all_servers # can be updated later
167 c.setServiceParent(self)
168 self.clients.append(c)
170 def add_server(self, i, serverid, ss):
171 # TODO: ss.setServiceParent(self), but first remove the goofy
172 # self.parent.nodeid from Storage.startService . At the moment,
173 # Storage doesn't really need to be startService'd, but it will in
175 ss.setNodeID(serverid)
176 self.servers_by_number[i] = ss
177 self.servers_by_id[serverid] = wrap(ss, "storage")
178 self.all_servers = frozenset(self.servers_by_id.items())
179 for c in self.clients:
180 c._servers = self.all_servers
184 self.s = service.MultiService()
185 self.s.startService()
188 return self.s.stopService()
190 def set_up_grid(self):
191 # self.basedir must be set
192 self.g = NoNetworkGrid(self.basedir)
193 self.g.setServiceParent(self.s)
195 def get_clientdir(self, i=0):
196 return self.g.clients[i].basedir
198 def get_serverdir(self, i):
199 return self.g.servers_by_number[i].storedir
201 def iterate_servers(self):
202 for i in sorted(self.g.servers_by_number.keys()):
203 ss = self.g.servers_by_number[i]
204 yield (i, ss, ss.storedir)