3 import os, shutil, sys, urllib
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web.client import getPage, downloadPage
8 from allmydata import client, introducer_and_vdrive
9 from allmydata.scripts import create_node
10 from allmydata.util import testutil, fileutil
12 from foolscap import eventual
13 from twisted.python import log
15 class SystemFramework(testutil.PollMixin):
18 def __init__(self, basedir, mode):
19 self.basedir = basedir = os.path.abspath(basedir)
20 if not basedir.startswith(os.path.abspath(".")):
21 raise AssertionError("safety issue: basedir must be a subdir")
22 self.testdir = testdir = os.path.join(basedir, "test")
23 if os.path.exists(testdir):
24 shutil.rmtree(testdir)
25 fileutil.make_dirs(testdir)
26 self.sparent = service.MultiService()
27 self.sparent.startService()
29 self.tub = foolscap.Tub()
30 self.tub.setServiceParent(self.sparent)
31 self.discard_shares = True
33 if mode in ("download", "download-GET"):
34 self.discard_shares = False
38 log.startLogging(open(os.path.join(self.testdir, "log"), "w"),
40 #logfile = open(os.path.join(self.testdir, "log"), "w")
41 #flo = log.FileLogObserver(logfile)
42 #log.startLoggingWithObserver(flo.emit, setStdout=False)
43 d = eventual.fireEventually()
44 d.addCallback(lambda res: self.setUp())
45 d.addCallback(lambda res: self.do_test())
46 d.addBoth(self.tearDown)
58 self.failed.raiseException()
63 self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
64 d = self.make_introducer_and_vdrive()
67 return self.start_client()
69 def _record_control_furl(control_furl):
70 self.control_furl = control_furl
71 #print "OBTAINING '%s'" % (control_furl,)
72 return self.tub.getReference(self.control_furl)
73 d.addCallback(_record_control_furl)
74 def _record_control(control_rref):
75 self.control_rref = control_rref
76 return control_rref.callRemote("wait_for_client_connections",
78 d.addCallback(_record_control)
85 def tearDown(self, passthrough):
86 # the client node will shut down in a few seconds
87 #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
88 log.msg("shutting down SystemTest services")
89 d = defer.succeed(None)
91 d.addCallback(lambda res: self.kill_client())
92 d.addCallback(lambda res: self.sparent.stopService())
93 d.addCallback(lambda res: eventual.flushEventualQueue())
94 def _close_statsfile(res):
95 self.statsfile.close()
96 d.addCallback(_close_statsfile)
97 d.addCallback(lambda res: passthrough)
100 def add_service(self, s):
101 s.setServiceParent(self.sparent)
104 def make_introducer_and_vdrive(self):
105 iv_basedir = os.path.join(self.testdir, "introducer_and_vdrive")
107 iv = introducer_and_vdrive.IntroducerAndVdrive(basedir=iv_basedir)
108 self.introducer_and_vdrive = self.add_service(iv)
109 d = self.introducer_and_vdrive.when_tub_ready()
112 def make_nodes(self):
113 q = self.introducer_and_vdrive
114 self.introducer_furl = q.urls["introducer"]
115 self.vdrive_furl = q.urls["vdrive"]
117 for i in range(self.numnodes):
118 nodedir = os.path.join(self.testdir, "node%d" % i)
120 f = open(os.path.join(nodedir, "introducer.furl"), "w")
121 f.write(self.introducer_furl)
123 f = open(os.path.join(nodedir, "vdrive.furl"), "w")
124 f.write(self.vdrive_furl)
126 if self.discard_shares:
127 # for this test, we tell the storage servers to throw out all
128 # their stored data, since we're only testing upload and not
130 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
131 f.write("no_storage\n")
133 c = self.add_service(client.Client(basedir=nodedir))
135 # the peers will start running, eventually they will connect to each
136 # other and the introducer_and_vdrive
138 def touch_keepalive(self):
139 f = open(self.keepalive_file, "w")
141 If the node notices this file at startup, it will poll every 5 seconds and
142 terminate if the file is more than 10 seconds old, or if it has been deleted.
143 If the test harness has an internal failure and neglects to kill off the node
144 itself, this helps to avoid leaving processes lying around. The contents of
145 this file are ignored.
149 def start_client(self):
150 # this returns a Deferred that fires with the client's control.furl
151 log.msg("MAKING CLIENT")
152 clientdir = self.clientdir = os.path.join(self.testdir, "client")
154 create_node.create_client(clientdir, {}, out=quiet)
155 log.msg("DONE MAKING CLIENT")
156 f = open(os.path.join(clientdir, "introducer.furl"), "w")
157 f.write(self.introducer_furl + "\n")
159 f = open(os.path.join(clientdir, "vdrive.furl"), "w")
160 f.write(self.vdrive_furl + "\n")
162 f = open(os.path.join(clientdir, "webport"), "w")
163 # TODO: ideally we would set webport=0 and then ask the node what
164 # port it picked. But at the moment it is not convenient to do this,
165 # so we just pick a relatively unique one.
166 webport = max(os.getpid(), 2000)
167 f.write("tcp:%d:interface=127.0.0.1\n" % webport)
169 self.webish_url = "http://localhost:%d" % webport
170 if self.discard_shares:
171 f = open(os.path.join(clientdir, "debug_no_storage"), "w")
172 f.write("no_storage\n")
174 if self.mode in ("upload-self", "download", "download-GET"):
175 f = open(os.path.join(clientdir, "push_to_ourselves"), "w")
176 f.write("push_to_ourselves\n")
178 self.keepalive_file = os.path.join(clientdir,
179 "suicide_prevention_hotline")
180 # now start updating the mtime.
181 self.touch_keepalive()
182 ts = internet.TimerService(4.0, self.touch_keepalive)
183 ts.setServiceParent(self.sparent)
186 self.proc_done = pp.d = defer.Deferred()
187 logfile = os.path.join(self.basedir, "client.log")
188 cmd = ["twistd", "-n", "-y", "client.tac", "-l", logfile]
189 env = os.environ.copy()
190 self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
191 log.msg("CLIENT STARTED")
193 # now we wait for the client to get started. we're looking for the
194 # control.furl file to appear.
195 furl_file = os.path.join(clientdir, "control.furl")
197 if pp.ended and pp.ended.value.status != 0:
198 # the twistd process ends normally (with rc=0) if the child
199 # is successfully launched. It ends abnormally (with rc!=0)
200 # if the child cannot be launched.
201 raise RuntimeError("process ended while waiting for startup")
202 return os.path.exists(furl_file)
203 d = self.poll(_check, 0.1)
204 # once it exists, wait a moment before we read from it, just in case
205 # it hasn't finished writing the whole thing. Ideally control.furl
206 # would be created in some atomic fashion, or made non-readable until
207 # it's ready, but I can't think of an easy way to do that, and I
208 # think the chances that we'll observe a half-write are pretty low.
210 d2 = defer.Deferred()
211 reactor.callLater(0.1, d2.callback, None)
213 d.addCallback(_stall)
215 f = open(furl_file, "r")
222 def kill_client(self):
223 # returns a Deferred that fires when the process exits. This may only
226 self.proc.signalProcess("INT")
227 except error.ProcessExitedAlready:
229 return self.proc_done
232 def create_data(self, name, size):
233 filename = os.path.join(self.testdir, name + ".data")
234 f = open(filename, "wb")
242 def stash_stats(self, stats, name):
243 self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
244 self.statsfile.flush()
245 self.stats[name] = stats['VmPeak']
247 def POST(self, urlpath, **fields):
248 url = self.webish_url + urlpath
249 sepbase = "boogabooga"
253 form.append('Content-Disposition: form-data; name="_charset"')
257 for name, value in fields.iteritems():
258 if isinstance(value, tuple):
259 filename, value = value
260 form.append('Content-Disposition: form-data; name="%s"; '
261 'filename="%s"' % (name, filename))
263 form.append('Content-Disposition: form-data; name="%s"' % name)
268 body = "\r\n".join(form) + "\r\n"
269 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
271 return getPage(url, method="POST", postdata=body,
272 headers=headers, followRedirect=False)
274 def GET_discard(self, urlpath):
276 url = self.webish_url + urlpath + "?filename=dummy-get.out"
277 return downloadPage(url, os.path.join(self.basedir, "dummy-get.out"))
279 def _print_usage(self, res=None):
280 d = self.control_rref.callRemote("get_memory_usage")
282 print "VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
285 d.addCallback(_print)
288 def _do_upload(self, res, size, files, uris):
291 print "uploading %s" % name
292 if self.mode in ("upload", "upload-self"):
293 files[name] = self.create_data(name, size)
294 d = self.control_rref.callRemote("upload_from_file_to_uri",
297 os.remove(files[name])
301 elif self.mode == "upload-POST":
303 url = "/vdrive/global"
304 d = self.POST(url, t="upload", file=("%d.data" % size, data))
305 elif self.mode in ("download", "download-GET"):
306 # upload the data from a local peer, then have the
307 # client-under-test download it.
308 files[name] = self.create_data(name, size)
309 u = self.nodes[0].getServiceNamed("uploader")
310 d = u.upload_filename(files[name])
312 raise RuntimeError("unknown mode=%s" % self.mode)
315 print "uploaded %s" % name
316 d.addCallback(_complete)
319 def _do_download(self, res, size, uris):
320 if self.mode not in ("download", "download-GET"):
324 if self.mode == "download":
325 d = self.control_rref.callRemote("download_from_uri_to_file",
327 if self.mode == "download-GET":
328 url = "/uri/%s" % uri
329 d = self.GET_discard(urllib.quote(url))
334 #print "CLIENT STARTED"
335 #print "FURL", self.control_furl
336 #print "RREF", self.control_rref
338 kB = 1000; MB = 1000*1000
342 d = self._print_usage()
343 d.addCallback(self.stash_stats, "0B")
346 d.addCallback(self._do_upload, 10*kB+i, files, uris)
347 d.addCallback(self._do_download, 10*kB+i, uris)
348 d.addCallback(self._print_usage)
349 d.addCallback(self.stash_stats, "10kB")
352 d.addCallback(self._do_upload, 10*MB+i, files, uris)
353 d.addCallback(self._do_download, 10*MB+i, uris)
354 d.addCallback(self._print_usage)
355 d.addCallback(self.stash_stats, "10MB")
358 d.addCallback(self._do_upload, 50*MB+i, files, uris)
359 d.addCallback(self._do_download, 50*MB+i, uris)
360 d.addCallback(self._print_usage)
361 d.addCallback(self.stash_stats, "50MB")
364 # d.addCallback(self._do_upload, 100*MB+i, files, uris)
365 # d.addCallback(self._do_download, 100*MB+i, uris)
366 # d.addCallback(self._print_usage)
367 #d.addCallback(self.stash_stats, "100MB")
369 #d.addCallback(self.stall)
375 def stall(self, res):
377 reactor.callLater(5, d.callback, None)
381 class ClientWatcher(protocol.ProcessProtocol):
383 def outReceived(self, data):
385 def errReceived(self, data):
387 def processEnded(self, reason):
389 self.d.callback(None)
392 if __name__ == '__main__':
394 if len(sys.argv) > 1:
396 # put the logfile and stats.out in _test_memory/ . These stick around.
397 # put the nodes and other files in _test_memory/test/ . These are
398 # removed each time we run.
399 sf = SystemFramework("_test_memory", mode)