3 import os, shutil, sys, urllib
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web.client import getPage, downloadPage
8 from allmydata import client, introducer_and_vdrive
9 from allmydata.scripts import create_node
10 from allmydata.util import testutil, fileutil
12 from foolscap import eventual
13 from twisted.python import log
15 class SystemFramework(testutil.PollMixin):
18 def __init__(self, basedir, mode):
19 self.basedir = basedir = os.path.abspath(basedir)
20 if not basedir.startswith(os.path.abspath(".")):
21 raise AssertionError("safety issue: basedir must be a subdir")
22 self.testdir = testdir = os.path.join(basedir, "test")
23 if os.path.exists(testdir):
24 shutil.rmtree(testdir)
25 fileutil.make_dirs(testdir)
26 self.sparent = service.MultiService()
27 self.sparent.startService()
29 self.tub = foolscap.Tub()
30 self.tub.setServiceParent(self.sparent)
31 self.discard_shares = True
33 if mode in ("download", "download-GET"):
34 self.discard_shares = False
38 log.startLogging(open(os.path.join(self.testdir, "log"), "w"),
40 #logfile = open(os.path.join(self.testdir, "log"), "w")
41 #flo = log.FileLogObserver(logfile)
42 #log.startLoggingWithObserver(flo.emit, setStdout=False)
43 d = eventual.fireEventually()
44 d.addCallback(lambda res: self.setUp())
45 d.addCallback(lambda res: self.do_test())
46 d.addBoth(self.tearDown)
58 self.failed.raiseException()
63 self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
64 d = self.make_introducer_and_vdrive()
67 return self.start_client()
69 def _record_control_furl(control_furl):
70 self.control_furl = control_furl
71 #print "OBTAINING '%s'" % (control_furl,)
72 return self.tub.getReference(self.control_furl)
73 d.addCallback(_record_control_furl)
74 def _record_control(control_rref):
75 self.control_rref = control_rref
76 return control_rref.callRemote("wait_for_client_connections",
78 d.addCallback(_record_control)
85 def tearDown(self, passthrough):
86 # the client node will shut down in a few seconds
87 #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
88 log.msg("shutting down SystemTest services")
89 d = defer.succeed(None)
91 d.addCallback(lambda res: self.kill_client())
92 d.addCallback(lambda res: self.sparent.stopService())
93 d.addCallback(lambda res: eventual.flushEventualQueue())
94 def _close_statsfile(res):
95 self.statsfile.close()
96 d.addCallback(_close_statsfile)
97 d.addCallback(lambda res: passthrough)
100 def add_service(self, s):
101 s.setServiceParent(self.sparent)
104 def make_introducer_and_vdrive(self):
105 iv_basedir = os.path.join(self.testdir, "introducer_and_vdrive")
107 iv = introducer_and_vdrive.IntroducerAndVdrive(basedir=iv_basedir)
108 self.introducer_and_vdrive = self.add_service(iv)
109 d = self.introducer_and_vdrive.when_tub_ready()
112 def make_nodes(self):
113 q = self.introducer_and_vdrive
114 self.introducer_furl = q.urls["introducer"]
115 self.vdrive_furl = q.urls["vdrive"]
117 for i in range(self.numnodes):
118 nodedir = os.path.join(self.testdir, "node%d" % i)
120 f = open(os.path.join(nodedir, "introducer.furl"), "w")
121 f.write(self.introducer_furl)
123 f = open(os.path.join(nodedir, "vdrive.furl"), "w")
124 f.write(self.vdrive_furl)
126 if self.discard_shares:
127 # for this test, we tell the storage servers to throw out all
128 # their stored data, since we're only testing upload and not
130 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
131 f.write("no_storage\n")
133 c = self.add_service(client.Client(basedir=nodedir))
135 # the peers will start running, eventually they will connect to each
136 # other and the introducer_and_vdrive
138 def touch_keepalive(self):
139 f = open(self.keepalive_file, "w")
141 If the node notices this file at startup, it will poll every 5 seconds and
142 terminate if the file is more than 10 seconds old, or if it has been deleted.
143 If the test harness has an internal failure and neglects to kill off the node
144 itself, this helps to avoid leaving processes lying around. The contents of
145 this file are ignored.
149 def start_client(self):
150 # this returns a Deferred that fires with the client's control.furl
151 log.msg("MAKING CLIENT")
152 clientdir = self.clientdir = os.path.join(self.testdir, "client")
154 create_node.create_client(clientdir, {}, out=quiet)
155 log.msg("DONE MAKING CLIENT")
156 f = open(os.path.join(clientdir, "introducer.furl"), "w")
157 f.write(self.introducer_furl + "\n")
159 f = open(os.path.join(clientdir, "vdrive.furl"), "w")
160 f.write(self.vdrive_furl + "\n")
162 f = open(os.path.join(clientdir, "webport"), "w")
163 # TODO: ideally we would set webport=0 and then ask the node what
164 # port it picked. But at the moment it is not convenient to do this,
165 # so we just pick a relatively unique one.
166 webport = max(os.getpid(), 2000)
167 f.write("tcp:%d:interface=127.0.0.1\n" % webport)
169 self.webish_url = "http://localhost:%d" % webport
170 if self.discard_shares:
171 f = open(os.path.join(clientdir, "debug_no_storage"), "w")
172 f.write("no_storage\n")
174 if self.mode in ("upload-self"):
175 f = open(os.path.join(clientdir, "push_to_ourselves"), "w")
176 f.write("push_to_ourselves\n")
179 f = open(os.path.join(clientdir, "sizelimit"), "w")
182 self.keepalive_file = os.path.join(clientdir,
183 "suicide_prevention_hotline")
184 # now start updating the mtime.
185 self.touch_keepalive()
186 ts = internet.TimerService(4.0, self.touch_keepalive)
187 ts.setServiceParent(self.sparent)
190 self.proc_done = pp.d = defer.Deferred()
191 logfile = os.path.join(self.basedir, "client.log")
192 cmd = ["twistd", "-n", "-y", "client.tac", "-l", logfile]
193 env = os.environ.copy()
194 self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
195 log.msg("CLIENT STARTED")
197 # now we wait for the client to get started. we're looking for the
198 # control.furl file to appear.
199 furl_file = os.path.join(clientdir, "control.furl")
201 if pp.ended and pp.ended.value.status != 0:
202 # the twistd process ends normally (with rc=0) if the child
203 # is successfully launched. It ends abnormally (with rc!=0)
204 # if the child cannot be launched.
205 raise RuntimeError("process ended while waiting for startup")
206 return os.path.exists(furl_file)
207 d = self.poll(_check, 0.1)
208 # once it exists, wait a moment before we read from it, just in case
209 # it hasn't finished writing the whole thing. Ideally control.furl
210 # would be created in some atomic fashion, or made non-readable until
211 # it's ready, but I can't think of an easy way to do that, and I
212 # think the chances that we'll observe a half-write are pretty low.
214 d2 = defer.Deferred()
215 reactor.callLater(0.1, d2.callback, None)
217 d.addCallback(_stall)
219 f = open(furl_file, "r")
226 def kill_client(self):
227 # returns a Deferred that fires when the process exits. This may only
230 self.proc.signalProcess("INT")
231 except error.ProcessExitedAlready:
233 return self.proc_done
236 def create_data(self, name, size):
237 filename = os.path.join(self.testdir, name + ".data")
238 f = open(filename, "wb")
246 def stash_stats(self, stats, name):
247 self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
248 self.statsfile.flush()
249 self.stats[name] = stats['VmPeak']
251 def POST(self, urlpath, **fields):
252 url = self.webish_url + urlpath
253 sepbase = "boogabooga"
257 form.append('Content-Disposition: form-data; name="_charset"')
261 for name, value in fields.iteritems():
262 if isinstance(value, tuple):
263 filename, value = value
264 form.append('Content-Disposition: form-data; name="%s"; '
265 'filename="%s"' % (name, filename))
267 form.append('Content-Disposition: form-data; name="%s"' % name)
272 body = "\r\n".join(form) + "\r\n"
273 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
275 return getPage(url, method="POST", postdata=body,
276 headers=headers, followRedirect=False)
278 def GET_discard(self, urlpath):
280 url = self.webish_url + urlpath + "?filename=dummy-get.out"
281 return downloadPage(url, os.path.join(self.basedir, "dummy-get.out"))
283 def _print_usage(self, res=None):
284 d = self.control_rref.callRemote("get_memory_usage")
286 print "VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
289 d.addCallback(_print)
292 def _do_upload(self, res, size, files, uris):
295 print "uploading %s" % name
296 if self.mode in ("upload", "upload-self"):
297 files[name] = self.create_data(name, size)
298 d = self.control_rref.callRemote("upload_from_file_to_uri",
301 os.remove(files[name])
305 elif self.mode == "upload-POST":
307 url = "/vdrive/global"
308 d = self.POST(url, t="upload", file=("%d.data" % size, data))
309 elif self.mode in ("download", "download-GET"):
310 # upload the data from a local peer, then have the
311 # client-under-test download it.
312 files[name] = self.create_data(name, size)
313 u = self.nodes[0].getServiceNamed("uploader")
314 d = u.upload_filename(files[name])
316 raise RuntimeError("unknown mode=%s" % self.mode)
319 print "uploaded %s" % name
320 d.addCallback(_complete)
323 def _do_download(self, res, size, uris):
324 if self.mode not in ("download", "download-GET"):
328 if self.mode == "download":
329 d = self.control_rref.callRemote("download_from_uri_to_file",
331 if self.mode == "download-GET":
332 url = "/uri/%s" % uri
333 d = self.GET_discard(urllib.quote(url))
338 #print "CLIENT STARTED"
339 #print "FURL", self.control_furl
340 #print "RREF", self.control_rref
342 kB = 1000; MB = 1000*1000
346 d = self._print_usage()
347 d.addCallback(self.stash_stats, "0B")
350 d.addCallback(self._do_upload, 10*kB+i, files, uris)
351 d.addCallback(self._do_download, 10*kB+i, uris)
352 d.addCallback(self._print_usage)
353 d.addCallback(self.stash_stats, "10kB")
356 d.addCallback(self._do_upload, 10*MB+i, files, uris)
357 d.addCallback(self._do_download, 10*MB+i, uris)
358 d.addCallback(self._print_usage)
359 d.addCallback(self.stash_stats, "10MB")
362 d.addCallback(self._do_upload, 50*MB+i, files, uris)
363 d.addCallback(self._do_download, 50*MB+i, uris)
364 d.addCallback(self._print_usage)
365 d.addCallback(self.stash_stats, "50MB")
368 # d.addCallback(self._do_upload, 100*MB+i, files, uris)
369 # d.addCallback(self._do_download, 100*MB+i, uris)
370 # d.addCallback(self._print_usage)
371 #d.addCallback(self.stash_stats, "100MB")
373 #d.addCallback(self.stall)
379 def stall(self, res):
381 reactor.callLater(5, d.callback, None)
385 class ClientWatcher(protocol.ProcessProtocol):
387 def outReceived(self, data):
389 def errReceived(self, data):
391 def processEnded(self, reason):
393 self.d.callback(None)
396 if __name__ == '__main__':
398 if len(sys.argv) > 1:
400 # put the logfile and stats.out in _test_memory/ . These stick around.
401 # put the nodes and other files in _test_memory/test/ . These are
402 # removed each time we run.
403 sf = SystemFramework("_test_memory", mode)