3 import os, shutil, sys, urllib, time, stat
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web import client as tw_client
8 from allmydata import client, introducer_and_vdrive
9 from allmydata.scripts import create_node
10 from allmydata.util import testutil, fileutil
12 from foolscap import eventual
13 from twisted.python import log
15 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
16 full_speed_ahead = False
19 def handleResponsePart(self, data):
20 self._bytes_so_far += len(data)
21 if not self.factory.do_stall:
23 if self.full_speed_ahead:
25 if self._bytes_so_far > 1e6+100:
28 self.transport.pauseProducing()
29 self.stalled = reactor.callLater(10.0, self._resume_speed)
30 def _resume_speed(self):
33 self.full_speed_ahead = True
34 self.transport.resumeProducing()
35 def handleResponseEnd(self):
40 return tw_client.HTTPPageGetter.handleResponseEnd(self)
42 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
43 protocol = StallableHTTPGetterDiscarder
45 def discardPage(url, stall=False, *args, **kwargs):
46 """Start fetching the URL, but stall our pipe after the first 1MB.
47 Wait 10 seconds, then resume downloading (and discarding) everything.
49 # adapted from twisted.web.client.getPage . We can't just wrap or
50 # subclass because it provides no way to override the HTTPClientFactory
52 scheme, host, port, path = tw_client._parse(url)
53 factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
54 factory.do_stall = stall
55 assert scheme == 'http'
56 reactor.connectTCP(host, port, factory)
57 return factory.deferred
59 class SystemFramework(testutil.PollMixin):
62 def __init__(self, basedir, mode):
63 self.basedir = basedir = os.path.abspath(basedir)
64 if not basedir.startswith(os.path.abspath(".")):
65 raise AssertionError("safety issue: basedir must be a subdir")
66 self.testdir = testdir = os.path.join(basedir, "test")
67 if os.path.exists(testdir):
68 shutil.rmtree(testdir)
69 fileutil.make_dirs(testdir)
70 self.sparent = service.MultiService()
71 self.sparent.startService()
73 self.tub = foolscap.Tub()
74 self.tub.setServiceParent(self.sparent)
77 self.keepalive_file = None
80 framelog = os.path.join(self.basedir, "driver.log")
81 log.startLogging(open(framelog, "a"), setStdout=False)
82 log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
83 #logfile = open(os.path.join(self.testdir, "log"), "w")
84 #flo = log.FileLogObserver(logfile)
85 #log.startLoggingWithObserver(flo.emit, setStdout=False)
86 d = eventual.fireEventually()
87 d.addCallback(lambda res: self.setUp())
88 d.addCallback(lambda res: self.record_initial_memusage())
89 d.addCallback(lambda res: self.make_nodes())
90 d.addCallback(lambda res: self.wait_for_client_connected())
91 d.addCallback(lambda res: self.do_test())
92 d.addBoth(self.tearDown)
104 # raiseException doesn't work for CopiedFailures
105 self.failed.raiseException()
110 self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
111 d = self.make_introducer_and_vdrive()
113 return self.start_client()
115 def _record_control_furl(control_furl):
116 self.control_furl = control_furl
117 #print "OBTAINING '%s'" % (control_furl,)
118 return self.tub.getReference(self.control_furl)
119 d.addCallback(_record_control_furl)
120 def _record_control(control_rref):
121 self.control_rref = control_rref
122 d.addCallback(_record_control)
124 #print "CLIENT READY"
126 d.addCallback(_ready)
129 def record_initial_memusage(self):
131 print "Client started (no connections yet)"
132 d = self._print_usage()
133 d.addCallback(self.stash_stats, "init")
136 def wait_for_client_connected(self):
138 print "Client connecting to other nodes.."
139 return self.control_rref.callRemote("wait_for_client_connections",
142 def tearDown(self, passthrough):
143 # the client node will shut down in a few seconds
144 #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
145 log.msg("shutting down SystemTest services")
146 if self.keepalive_file and os.path.exists(self.keepalive_file):
147 age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
148 log.msg("keepalive file at shutdown was %ds old" % age)
149 d = defer.succeed(None)
151 d.addCallback(lambda res: self.kill_client())
152 d.addCallback(lambda res: self.sparent.stopService())
153 d.addCallback(lambda res: eventual.flushEventualQueue())
154 def _close_statsfile(res):
155 self.statsfile.close()
156 d.addCallback(_close_statsfile)
157 d.addCallback(lambda res: passthrough)
160 def add_service(self, s):
161 s.setServiceParent(self.sparent)
164 def make_introducer_and_vdrive(self):
165 iv_basedir = os.path.join(self.testdir, "introducer_and_vdrive")
167 iv = introducer_and_vdrive.IntroducerAndVdrive(basedir=iv_basedir)
168 self.introducer_and_vdrive = self.add_service(iv)
169 d = self.introducer_and_vdrive.when_tub_ready()
170 def _introducer_ready(res):
171 q = self.introducer_and_vdrive
172 self.introducer_furl = q.urls["introducer"]
173 self.vdrive_furl = q.urls["vdrive"]
174 d.addCallback(_introducer_ready)
177 def make_nodes(self):
179 for i in range(self.numnodes):
180 nodedir = os.path.join(self.testdir, "node%d" % i)
182 f = open(os.path.join(nodedir, "introducer.furl"), "w")
183 f.write(self.introducer_furl)
185 f = open(os.path.join(nodedir, "vdrive.furl"), "w")
186 f.write(self.vdrive_furl)
188 # the only tests for which we want the internal nodes to actually
189 # retain shares are the ones where somebody's going to download
191 if self.mode in ("download", "download-GET", "download-GET-slow"):
195 # for these tests, we tell the storage servers to pretend to
196 # accept shares, but really just throw them out, since we're
197 # only testing upload and not download.
198 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
199 f.write("no_storage\n")
201 if self.mode in ("receive",):
202 # for this mode, the client-under-test gets all the shares,
203 # so our internal nodes can refuse requests
204 f = open(os.path.join(nodedir, "sizelimit"), "w")
207 c = self.add_service(client.Client(basedir=nodedir))
209 # the peers will start running, eventually they will connect to each
210 # other and the introducer_and_vdrive
212 def touch_keepalive(self):
213 if os.path.exists(self.keepalive_file):
214 age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
215 log.msg("touching keepalive file, was %ds old" % age)
216 f = open(self.keepalive_file, "w")
218 If the node notices this file at startup, it will poll every 5 seconds and
219 terminate if the file is more than 10 seconds old, or if it has been deleted.
220 If the test harness has an internal failure and neglects to kill off the node
221 itself, this helps to avoid leaving processes lying around. The contents of
222 this file are ignored.
226 def start_client(self):
227 # this returns a Deferred that fires with the client's control.furl
228 log.msg("MAKING CLIENT")
229 clientdir = self.clientdir = os.path.join(self.testdir, "client")
231 create_node.create_client(clientdir, {}, out=quiet)
232 log.msg("DONE MAKING CLIENT")
233 f = open(os.path.join(clientdir, "introducer.furl"), "w")
234 f.write(self.introducer_furl + "\n")
236 f = open(os.path.join(clientdir, "vdrive.furl"), "w")
237 f.write(self.vdrive_furl + "\n")
239 f = open(os.path.join(clientdir, "webport"), "w")
240 # TODO: ideally we would set webport=0 and then ask the node what
241 # port it picked. But at the moment it is not convenient to do this,
242 # so we just pick a relatively unique one.
243 webport = max(os.getpid(), 2000)
244 f.write("tcp:%d:interface=127.0.0.1\n" % webport)
246 self.webish_url = "http://localhost:%d" % webport
247 if self.mode in ("upload-self", "receive"):
248 # accept and store shares, to trigger the memory consumption bugs
251 # don't accept any shares
252 f = open(os.path.join(clientdir, "sizelimit"), "w")
255 ## also, if we do receive any shares, throw them away
256 #f = open(os.path.join(clientdir, "debug_no_storage"), "w")
257 #f.write("no_storage\n")
259 if self.mode == "upload-self":
260 f = open(os.path.join(clientdir, "push_to_ourselves"), "w")
261 f.write("push_to_ourselves\n")
263 self.keepalive_file = os.path.join(clientdir,
264 "suicide_prevention_hotline")
265 # now start updating the mtime.
266 self.touch_keepalive()
267 ts = internet.TimerService(1.0, self.touch_keepalive)
268 ts.setServiceParent(self.sparent)
271 self.proc_done = pp.d = defer.Deferred()
272 logfile = os.path.join(self.basedir, "client.log")
273 cmd = ["twistd", "-n", "-y", "client.tac", "-l", logfile]
274 env = os.environ.copy()
275 self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
276 log.msg("CLIENT STARTED")
278 # now we wait for the client to get started. we're looking for the
279 # control.furl file to appear.
280 furl_file = os.path.join(clientdir, "control.furl")
282 if pp.ended and pp.ended.value.status != 0:
283 # the twistd process ends normally (with rc=0) if the child
284 # is successfully launched. It ends abnormally (with rc!=0)
285 # if the child cannot be launched.
286 raise RuntimeError("process ended while waiting for startup")
287 return os.path.exists(furl_file)
288 d = self.poll(_check, 0.1)
289 # once it exists, wait a moment before we read from it, just in case
290 # it hasn't finished writing the whole thing. Ideally control.furl
291 # would be created in some atomic fashion, or made non-readable until
292 # it's ready, but I can't think of an easy way to do that, and I
293 # think the chances that we'll observe a half-write are pretty low.
295 d2 = defer.Deferred()
296 reactor.callLater(0.1, d2.callback, None)
298 d.addCallback(_stall)
300 f = open(furl_file, "r")
307 def kill_client(self):
308 # returns a Deferred that fires when the process exits. This may only
311 self.proc.signalProcess("INT")
312 except error.ProcessExitedAlready:
314 return self.proc_done
317 def create_data(self, name, size):
318 filename = os.path.join(self.testdir, name + ".data")
319 f = open(filename, "wb")
327 def stash_stats(self, stats, name):
328 self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
329 self.statsfile.flush()
330 self.stats[name] = stats['VmPeak']
332 def POST(self, urlpath, **fields):
333 url = self.webish_url + urlpath
334 sepbase = "boogabooga"
338 form.append('Content-Disposition: form-data; name="_charset"')
342 for name, value in fields.iteritems():
343 if isinstance(value, tuple):
344 filename, value = value
345 form.append('Content-Disposition: form-data; name="%s"; '
346 'filename="%s"' % (name, filename))
348 form.append('Content-Disposition: form-data; name="%s"' % name)
353 body = "\r\n".join(form) + "\r\n"
354 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
356 return tw_client.getPage(url, method="POST", postdata=body,
357 headers=headers, followRedirect=False)
359 def GET_discard(self, urlpath, stall):
360 url = self.webish_url + urlpath + "?filename=dummy-get.out"
361 return discardPage(url, stall)
363 def _print_usage(self, res=None):
364 d = self.control_rref.callRemote("get_memory_usage")
366 print "VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
369 d.addCallback(_print)
372 def _do_upload(self, res, size, files, uris):
375 print "uploading %s" % name
376 if self.mode in ("upload", "upload-self"):
377 files[name] = self.create_data(name, size)
378 d = self.control_rref.callRemote("upload_from_file_to_uri",
381 os.remove(files[name])
385 elif self.mode == "upload-POST":
387 url = "/vdrive/global"
388 d = self.POST(url, t="upload", file=("%d.data" % size, data))
389 elif self.mode in ("receive",
390 "download", "download-GET", "download-GET-slow"):
391 # mode=receive: upload the data from a local peer, so that the
392 # client-under-test receives and stores the shares
394 # mode=download*: upload the data from a local peer, then have
395 # the client-under-test download it.
397 # we need to wait until the uploading node has connected to all
398 # peers, since the wait_for_client_connections() above doesn't
399 # pay attention to our self.nodes[] and their connections.
400 files[name] = self.create_data(name, size)
401 u = self.nodes[0].getServiceNamed("uploader")
402 d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
403 d.addCallback(lambda res: u.upload_filename(files[name]))
405 raise RuntimeError("unknown mode=%s" % self.mode)
408 print "uploaded %s" % name
409 d.addCallback(_complete)
412 def _do_download(self, res, size, uris):
413 if self.mode not in ("download", "download-GET", "download-GET-slow"):
416 print "downloading %s" % name
419 if self.mode == "download":
420 d = self.control_rref.callRemote("download_from_uri_to_file",
422 elif self.mode == "download-GET":
423 url = "/uri/%s" % uri
424 d = self.GET_discard(urllib.quote(url), stall=False)
425 elif self.mode == "download-GET-slow":
426 url = "/uri/%s" % uri
427 d = self.GET_discard(urllib.quote(url), stall=True)
430 print "downloaded %s" % name
432 d.addCallback(_complete)
436 #print "CLIENT STARTED"
437 #print "FURL", self.control_furl
438 #print "RREF", self.control_rref
440 kB = 1000; MB = 1000*1000
444 d = self._print_usage()
445 d.addCallback(self.stash_stats, "0B")
448 d.addCallback(self._do_upload, 10*kB+i, files, uris)
449 d.addCallback(self._do_download, 10*kB+i, uris)
450 d.addCallback(self._print_usage)
451 d.addCallback(self.stash_stats, "10kB")
454 d.addCallback(self._do_upload, 10*MB+i, files, uris)
455 d.addCallback(self._do_download, 10*MB+i, uris)
456 d.addCallback(self._print_usage)
457 d.addCallback(self.stash_stats, "10MB")
460 d.addCallback(self._do_upload, 50*MB+i, files, uris)
461 d.addCallback(self._do_download, 50*MB+i, uris)
462 d.addCallback(self._print_usage)
463 d.addCallback(self.stash_stats, "50MB")
466 # d.addCallback(self._do_upload, 100*MB+i, files, uris)
467 # d.addCallback(self._do_download, 100*MB+i, uris)
468 # d.addCallback(self._print_usage)
469 #d.addCallback(self.stash_stats, "100MB")
471 #d.addCallback(self.stall)
477 def stall(self, res):
479 reactor.callLater(5, d.callback, None)
483 class ClientWatcher(protocol.ProcessProtocol):
485 def outReceived(self, data):
487 def errReceived(self, data):
489 def processEnded(self, reason):
491 self.d.callback(None)
494 if __name__ == '__main__':
496 if len(sys.argv) > 1:
498 # put the logfile and stats.out in _test_memory/ . These stick around.
499 # put the nodes and other files in _test_memory/test/ . These are
500 # removed each time we run.
501 sf = SystemFramework("_test_memory", mode)