3 import os, shutil, sys, urllib
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web import client as tw_client
8 from allmydata import client, introducer_and_vdrive
9 from allmydata.scripts import create_node
10 from allmydata.util import testutil, fileutil
12 from foolscap import eventual
13 from twisted.python import log
15 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
16 full_speed_ahead = False
19 def handleResponsePart(self, data):
20 self._bytes_so_far += len(data)
21 if not self.factory.do_stall:
23 if self.full_speed_ahead:
25 if self._bytes_so_far > 1e6+100:
28 self.transport.pauseProducing()
29 self.stalled = reactor.callLater(10.0, self._resume_speed)
30 def _resume_speed(self):
33 self.full_speed_ahead = True
34 self.transport.resumeProducing()
35 def handleResponseEnd(self):
40 return tw_client.HTTPPageGetter.handleResponseEnd(self)
42 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
43 protocol = StallableHTTPGetterDiscarder
45 def discardPage(url, stall=False, *args, **kwargs):
46 """Start fetching the URL, but stall our pipe after the first 1MB.
47 Wait 10 seconds, then resume downloading (and discarding) everything.
49 # adapted from twisted.web.client.getPage . We can't just wrap or
50 # subclass because it provides no way to override the HTTPClientFactory
52 scheme, host, port, path = tw_client._parse(url)
53 factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
54 factory.do_stall = stall
55 assert scheme == 'http'
56 reactor.connectTCP(host, port, factory)
57 return factory.deferred
59 class SystemFramework(testutil.PollMixin):
62 def __init__(self, basedir, mode):
63 self.basedir = basedir = os.path.abspath(basedir)
64 if not basedir.startswith(os.path.abspath(".")):
65 raise AssertionError("safety issue: basedir must be a subdir")
66 self.testdir = testdir = os.path.join(basedir, "test")
67 if os.path.exists(testdir):
68 shutil.rmtree(testdir)
69 fileutil.make_dirs(testdir)
70 self.sparent = service.MultiService()
71 self.sparent.startService()
73 self.tub = foolscap.Tub()
74 self.tub.setServiceParent(self.sparent)
79 log.startLogging(open(os.path.join(self.testdir, "log"), "w"),
81 #logfile = open(os.path.join(self.testdir, "log"), "w")
82 #flo = log.FileLogObserver(logfile)
83 #log.startLoggingWithObserver(flo.emit, setStdout=False)
84 d = eventual.fireEventually()
85 d.addCallback(lambda res: self.setUp())
86 d.addCallback(lambda res: self.record_initial_memusage())
87 d.addCallback(lambda res: self.make_nodes())
88 d.addCallback(lambda res: self.wait_for_client_connected())
89 d.addCallback(lambda res: self.do_test())
90 d.addBoth(self.tearDown)
102 self.failed.raiseException()
107 self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
108 d = self.make_introducer_and_vdrive()
110 return self.start_client()
112 def _record_control_furl(control_furl):
113 self.control_furl = control_furl
114 #print "OBTAINING '%s'" % (control_furl,)
115 return self.tub.getReference(self.control_furl)
116 d.addCallback(_record_control_furl)
117 def _record_control(control_rref):
118 self.control_rref = control_rref
119 d.addCallback(_record_control)
121 #print "CLIENT READY"
123 d.addCallback(_ready)
126 def record_initial_memusage(self):
128 print "Client started (no connections yet)"
129 d = self._print_usage()
130 d.addCallback(self.stash_stats, "init")
133 def wait_for_client_connected(self):
135 print "Client connecting to other nodes.."
136 return self.control_rref.callRemote("wait_for_client_connections",
139 def tearDown(self, passthrough):
140 # the client node will shut down in a few seconds
141 #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
142 log.msg("shutting down SystemTest services")
143 d = defer.succeed(None)
145 d.addCallback(lambda res: self.kill_client())
146 d.addCallback(lambda res: self.sparent.stopService())
147 d.addCallback(lambda res: eventual.flushEventualQueue())
148 def _close_statsfile(res):
149 self.statsfile.close()
150 d.addCallback(_close_statsfile)
151 d.addCallback(lambda res: passthrough)
154 def add_service(self, s):
155 s.setServiceParent(self.sparent)
158 def make_introducer_and_vdrive(self):
159 iv_basedir = os.path.join(self.testdir, "introducer_and_vdrive")
161 iv = introducer_and_vdrive.IntroducerAndVdrive(basedir=iv_basedir)
162 self.introducer_and_vdrive = self.add_service(iv)
163 d = self.introducer_and_vdrive.when_tub_ready()
164 def _introducer_ready(res):
165 q = self.introducer_and_vdrive
166 self.introducer_furl = q.urls["introducer"]
167 self.vdrive_furl = q.urls["vdrive"]
168 d.addCallback(_introducer_ready)
171 def make_nodes(self):
173 for i in range(self.numnodes):
174 nodedir = os.path.join(self.testdir, "node%d" % i)
176 f = open(os.path.join(nodedir, "introducer.furl"), "w")
177 f.write(self.introducer_furl)
179 f = open(os.path.join(nodedir, "vdrive.furl"), "w")
180 f.write(self.vdrive_furl)
182 # the only tests for which we want the internal nodes to actually
183 # retain shares are the ones where somebody's going to download
185 if self.mode in ("download", "download-GET", "download-GET-slow"):
189 # for these tests, we tell the storage servers to pretend to
190 # accept shares, but really just throw them out, since we're
191 # only testing upload and not download.
192 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
193 f.write("no_storage\n")
195 if self.mode in ("receive",):
196 # for this mode, the client-under-test gets all the shares,
197 # so our internal nodes can refuse requests
198 f = open(os.path.join(nodedir, "sizelimit"), "w")
201 c = self.add_service(client.Client(basedir=nodedir))
203 # the peers will start running, eventually they will connect to each
204 # other and the introducer_and_vdrive
206 def touch_keepalive(self):
207 f = open(self.keepalive_file, "w")
209 If the node notices this file at startup, it will poll every 5 seconds and
210 terminate if the file is more than 10 seconds old, or if it has been deleted.
211 If the test harness has an internal failure and neglects to kill off the node
212 itself, this helps to avoid leaving processes lying around. The contents of
213 this file are ignored.
217 def start_client(self):
218 # this returns a Deferred that fires with the client's control.furl
219 log.msg("MAKING CLIENT")
220 clientdir = self.clientdir = os.path.join(self.testdir, "client")
222 create_node.create_client(clientdir, {}, out=quiet)
223 log.msg("DONE MAKING CLIENT")
224 f = open(os.path.join(clientdir, "introducer.furl"), "w")
225 f.write(self.introducer_furl + "\n")
227 f = open(os.path.join(clientdir, "vdrive.furl"), "w")
228 f.write(self.vdrive_furl + "\n")
230 f = open(os.path.join(clientdir, "webport"), "w")
231 # TODO: ideally we would set webport=0 and then ask the node what
232 # port it picked. But at the moment it is not convenient to do this,
233 # so we just pick a relatively unique one.
234 webport = max(os.getpid(), 2000)
235 f.write("tcp:%d:interface=127.0.0.1\n" % webport)
237 self.webish_url = "http://localhost:%d" % webport
238 if self.mode in ("upload-self", "receive"):
239 # accept and store shares, to trigger the memory consumption bugs
242 # don't accept any shares
243 f = open(os.path.join(clientdir, "sizelimit"), "w")
246 ## also, if we do receive any shares, throw them away
247 #f = open(os.path.join(clientdir, "debug_no_storage"), "w")
248 #f.write("no_storage\n")
250 if self.mode == "upload-self":
251 f = open(os.path.join(clientdir, "push_to_ourselves"), "w")
252 f.write("push_to_ourselves\n")
254 self.keepalive_file = os.path.join(clientdir,
255 "suicide_prevention_hotline")
256 # now start updating the mtime.
257 self.touch_keepalive()
258 ts = internet.TimerService(4.0, self.touch_keepalive)
259 ts.setServiceParent(self.sparent)
262 self.proc_done = pp.d = defer.Deferred()
263 logfile = os.path.join(self.basedir, "client.log")
264 cmd = ["twistd", "-n", "-y", "client.tac", "-l", logfile]
265 env = os.environ.copy()
266 self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
267 log.msg("CLIENT STARTED")
269 # now we wait for the client to get started. we're looking for the
270 # control.furl file to appear.
271 furl_file = os.path.join(clientdir, "control.furl")
273 if pp.ended and pp.ended.value.status != 0:
274 # the twistd process ends normally (with rc=0) if the child
275 # is successfully launched. It ends abnormally (with rc!=0)
276 # if the child cannot be launched.
277 raise RuntimeError("process ended while waiting for startup")
278 return os.path.exists(furl_file)
279 d = self.poll(_check, 0.1)
280 # once it exists, wait a moment before we read from it, just in case
281 # it hasn't finished writing the whole thing. Ideally control.furl
282 # would be created in some atomic fashion, or made non-readable until
283 # it's ready, but I can't think of an easy way to do that, and I
284 # think the chances that we'll observe a half-write are pretty low.
286 d2 = defer.Deferred()
287 reactor.callLater(0.1, d2.callback, None)
289 d.addCallback(_stall)
291 f = open(furl_file, "r")
298 def kill_client(self):
299 # returns a Deferred that fires when the process exits. This may only
302 self.proc.signalProcess("INT")
303 except error.ProcessExitedAlready:
305 return self.proc_done
308 def create_data(self, name, size):
309 filename = os.path.join(self.testdir, name + ".data")
310 f = open(filename, "wb")
318 def stash_stats(self, stats, name):
319 self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
320 self.statsfile.flush()
321 self.stats[name] = stats['VmPeak']
323 def POST(self, urlpath, **fields):
324 url = self.webish_url + urlpath
325 sepbase = "boogabooga"
329 form.append('Content-Disposition: form-data; name="_charset"')
333 for name, value in fields.iteritems():
334 if isinstance(value, tuple):
335 filename, value = value
336 form.append('Content-Disposition: form-data; name="%s"; '
337 'filename="%s"' % (name, filename))
339 form.append('Content-Disposition: form-data; name="%s"' % name)
344 body = "\r\n".join(form) + "\r\n"
345 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
347 return tw_client.getPage(url, method="POST", postdata=body,
348 headers=headers, followRedirect=False)
350 def GET_discard(self, urlpath, stall):
351 url = self.webish_url + urlpath + "?filename=dummy-get.out"
352 return discardPage(url, stall)
354 def _print_usage(self, res=None):
355 d = self.control_rref.callRemote("get_memory_usage")
357 print "VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
360 d.addCallback(_print)
363 def _do_upload(self, res, size, files, uris):
366 print "uploading %s" % name
367 if self.mode in ("upload", "upload-self"):
368 files[name] = self.create_data(name, size)
369 d = self.control_rref.callRemote("upload_from_file_to_uri",
372 os.remove(files[name])
376 elif self.mode == "upload-POST":
378 url = "/vdrive/global"
379 d = self.POST(url, t="upload", file=("%d.data" % size, data))
380 elif self.mode in ("receive",):
381 # upload the data from a local peer, so that the
382 # client-under-test receives and stores the shares
383 files[name] = self.create_data(name, size)
384 u = self.nodes[0].getServiceNamed("uploader")
385 d = u.upload_filename(files[name])
386 elif self.mode in ("download", "download-GET", "download-GET-slow"):
387 # upload the data from a local peer, then have the
388 # client-under-test download it.
389 files[name] = self.create_data(name, size)
390 u = self.nodes[0].getServiceNamed("uploader")
391 d = u.upload_filename(files[name])
393 raise RuntimeError("unknown mode=%s" % self.mode)
396 print "uploaded %s" % name
397 d.addCallback(_complete)
400 def _do_download(self, res, size, uris):
401 if self.mode not in ("download", "download-GET", "download-GET-slow"):
404 print "downloading %s" % name
407 if self.mode == "download":
408 d = self.control_rref.callRemote("download_from_uri_to_file",
410 elif self.mode == "download-GET":
411 url = "/uri/%s" % uri
412 d = self.GET_discard(urllib.quote(url), stall=False)
413 elif self.mode == "download-GET-slow":
414 url = "/uri/%s" % uri
415 d = self.GET_discard(urllib.quote(url), stall=True)
418 print "downloaded %s" % name
420 d.addCallback(_complete)
424 #print "CLIENT STARTED"
425 #print "FURL", self.control_furl
426 #print "RREF", self.control_rref
428 kB = 1000; MB = 1000*1000
432 d = self._print_usage()
433 d.addCallback(self.stash_stats, "0B")
436 d.addCallback(self._do_upload, 10*kB+i, files, uris)
437 d.addCallback(self._do_download, 10*kB+i, uris)
438 d.addCallback(self._print_usage)
439 d.addCallback(self.stash_stats, "10kB")
442 d.addCallback(self._do_upload, 10*MB+i, files, uris)
443 d.addCallback(self._do_download, 10*MB+i, uris)
444 d.addCallback(self._print_usage)
445 d.addCallback(self.stash_stats, "10MB")
448 d.addCallback(self._do_upload, 50*MB+i, files, uris)
449 d.addCallback(self._do_download, 50*MB+i, uris)
450 d.addCallback(self._print_usage)
451 d.addCallback(self.stash_stats, "50MB")
454 # d.addCallback(self._do_upload, 100*MB+i, files, uris)
455 # d.addCallback(self._do_download, 100*MB+i, uris)
456 # d.addCallback(self._print_usage)
457 #d.addCallback(self.stash_stats, "100MB")
459 #d.addCallback(self.stall)
465 def stall(self, res):
467 reactor.callLater(5, d.callback, None)
471 class ClientWatcher(protocol.ProcessProtocol):
473 def outReceived(self, data):
475 def errReceived(self, data):
477 def processEnded(self, reason):
479 self.d.callback(None)
482 if __name__ == '__main__':
484 if len(sys.argv) > 1:
486 # put the logfile and stats.out in _test_memory/ . These stick around.
487 # put the nodes and other files in _test_memory/test/ . These are
488 # removed each time we run.
489 sf = SystemFramework("_test_memory", mode)