3 import os, shutil, sys, urllib, time, stat
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web import client as tw_client
8 from allmydata import client, introducer, upload
9 from allmydata.scripts import create_node
10 from allmydata.util import testutil, fileutil
12 from foolscap import eventual
13 from twisted.python import log
15 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
16 full_speed_ahead = False
19 def handleResponsePart(self, data):
20 self._bytes_so_far += len(data)
21 if not self.factory.do_stall:
23 if self.full_speed_ahead:
25 if self._bytes_so_far > 1e6+100:
28 self.transport.pauseProducing()
29 self.stalled = reactor.callLater(10.0, self._resume_speed)
30 def _resume_speed(self):
33 self.full_speed_ahead = True
34 self.transport.resumeProducing()
35 def handleResponseEnd(self):
40 return tw_client.HTTPPageGetter.handleResponseEnd(self)
42 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
43 protocol = StallableHTTPGetterDiscarder
45 def discardPage(url, stall=False, *args, **kwargs):
46 """Start fetching the URL, but stall our pipe after the first 1MB.
47 Wait 10 seconds, then resume downloading (and discarding) everything.
49 # adapted from twisted.web.client.getPage . We can't just wrap or
50 # subclass because it provides no way to override the HTTPClientFactory
52 scheme, host, port, path = tw_client._parse(url)
53 factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
54 factory.do_stall = stall
55 assert scheme == 'http'
56 reactor.connectTCP(host, port, factory)
57 return factory.deferred
59 class SystemFramework(testutil.PollMixin):
62 def __init__(self, basedir, mode):
63 self.basedir = basedir = os.path.abspath(basedir)
64 if not basedir.startswith(os.path.abspath(".")):
65 raise AssertionError("safety issue: basedir must be a subdir")
66 self.testdir = testdir = os.path.join(basedir, "test")
67 if os.path.exists(testdir):
68 shutil.rmtree(testdir)
69 fileutil.make_dirs(testdir)
70 self.sparent = service.MultiService()
71 self.sparent.startService()
73 self.tub = foolscap.Tub()
74 self.tub.setServiceParent(self.sparent)
77 self.keepalive_file = None
80 framelog = os.path.join(self.basedir, "driver.log")
81 log.startLogging(open(framelog, "a"), setStdout=False)
82 log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
83 #logfile = open(os.path.join(self.testdir, "log"), "w")
84 #flo = log.FileLogObserver(logfile)
85 #log.startLoggingWithObserver(flo.emit, setStdout=False)
86 d = eventual.fireEventually()
87 d.addCallback(lambda res: self.setUp())
88 d.addCallback(lambda res: self.record_initial_memusage())
89 d.addCallback(lambda res: self.make_nodes())
90 d.addCallback(lambda res: self.wait_for_client_connected())
91 d.addCallback(lambda res: self.do_test())
92 d.addBoth(self.tearDown)
104 # raiseException doesn't work for CopiedFailures
105 self.failed.raiseException()
110 self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
111 d = self.make_introducer()
113 return self.start_client()
115 def _record_control_furl(control_furl):
116 self.control_furl = control_furl
117 #print "OBTAINING '%s'" % (control_furl,)
118 return self.tub.getReference(self.control_furl)
119 d.addCallback(_record_control_furl)
120 def _record_control(control_rref):
121 self.control_rref = control_rref
122 d.addCallback(_record_control)
124 #print "CLIENT READY"
126 d.addCallback(_ready)
129 def record_initial_memusage(self):
131 print "Client started (no connections yet)"
132 d = self._print_usage()
133 d.addCallback(self.stash_stats, "init")
136 def wait_for_client_connected(self):
138 print "Client connecting to other nodes.."
139 return self.control_rref.callRemote("wait_for_client_connections",
142 def tearDown(self, passthrough):
143 # the client node will shut down in a few seconds
144 #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
145 log.msg("shutting down SystemTest services")
146 if self.keepalive_file and os.path.exists(self.keepalive_file):
147 age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
148 log.msg("keepalive file at shutdown was %ds old" % age)
149 d = defer.succeed(None)
151 d.addCallback(lambda res: self.kill_client())
152 d.addCallback(lambda res: self.sparent.stopService())
153 d.addCallback(lambda res: eventual.flushEventualQueue())
154 def _close_statsfile(res):
155 self.statsfile.close()
156 d.addCallback(_close_statsfile)
157 d.addCallback(lambda res: passthrough)
160 def add_service(self, s):
161 s.setServiceParent(self.sparent)
164 def make_introducer(self):
165 iv_basedir = os.path.join(self.testdir, "introducer")
167 iv = introducer.IntroducerNode(basedir=iv_basedir)
168 self.introducer = self.add_service(iv)
169 d = self.introducer.when_tub_ready()
170 def _introducer_ready(res):
172 self.introducer_furl = q.introducer_url
173 d.addCallback(_introducer_ready)
176 def make_nodes(self):
178 for i in range(self.numnodes):
179 nodedir = os.path.join(self.testdir, "node%d" % i)
181 f = open(os.path.join(nodedir, "introducer.furl"), "w")
182 f.write(self.introducer_furl)
184 # the only tests for which we want the internal nodes to actually
185 # retain shares are the ones where somebody's going to download
187 if self.mode in ("download", "download-GET", "download-GET-slow"):
191 # for these tests, we tell the storage servers to pretend to
192 # accept shares, but really just throw them out, since we're
193 # only testing upload and not download.
194 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
195 f.write("no_storage\n")
197 if self.mode in ("receive",):
198 # for this mode, the client-under-test gets all the shares,
199 # so our internal nodes can refuse requests
200 f = open(os.path.join(nodedir, "sizelimit"), "w")
203 c = self.add_service(client.Client(basedir=nodedir))
205 # the peers will start running, eventually they will connect to each
206 # other and the introducer
208 def touch_keepalive(self):
209 if os.path.exists(self.keepalive_file):
210 age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
211 log.msg("touching keepalive file, was %ds old" % age)
212 f = open(self.keepalive_file, "w")
214 If the node notices this file at startup, it will poll every 5 seconds and
215 terminate if the file is more than 10 seconds old, or if it has been deleted.
216 If the test harness has an internal failure and neglects to kill off the node
217 itself, this helps to avoid leaving processes lying around. The contents of
218 this file are ignored.
222 def start_client(self):
223 # this returns a Deferred that fires with the client's control.furl
224 log.msg("MAKING CLIENT")
225 clientdir = self.clientdir = os.path.join(self.testdir, "client")
227 create_node.create_client(clientdir, {}, out=quiet)
228 log.msg("DONE MAKING CLIENT")
229 f = open(os.path.join(clientdir, "introducer.furl"), "w")
230 f.write(self.introducer_furl + "\n")
232 f = open(os.path.join(clientdir, "webport"), "w")
233 # TODO: ideally we would set webport=0 and then ask the node what
234 # port it picked. But at the moment it is not convenient to do this,
235 # so we just pick a relatively unique one.
236 webport = max(os.getpid(), 2000)
237 f.write("tcp:%d:interface=127.0.0.1\n" % webport)
239 self.webish_url = "http://localhost:%d" % webport
240 if self.mode in ("upload-self", "receive"):
241 # accept and store shares, to trigger the memory consumption bugs
244 # don't accept any shares
245 f = open(os.path.join(clientdir, "sizelimit"), "w")
248 ## also, if we do receive any shares, throw them away
249 #f = open(os.path.join(clientdir, "debug_no_storage"), "w")
250 #f.write("no_storage\n")
252 if self.mode == "upload-self":
253 f = open(os.path.join(clientdir, "push_to_ourselves"), "w")
254 f.write("push_to_ourselves\n")
256 self.keepalive_file = os.path.join(clientdir,
257 "suicide_prevention_hotline")
258 # now start updating the mtime.
259 self.touch_keepalive()
260 ts = internet.TimerService(1.0, self.touch_keepalive)
261 ts.setServiceParent(self.sparent)
264 self.proc_done = pp.d = defer.Deferred()
265 logfile = os.path.join(self.basedir, "client.log")
266 cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
267 env = os.environ.copy()
268 self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
269 log.msg("CLIENT STARTED")
271 # now we wait for the client to get started. we're looking for the
272 # control.furl file to appear.
273 furl_file = os.path.join(clientdir, "private", "control.furl")
275 if pp.ended and pp.ended.value.status != 0:
276 # the twistd process ends normally (with rc=0) if the child
277 # is successfully launched. It ends abnormally (with rc!=0)
278 # if the child cannot be launched.
279 raise RuntimeError("process ended while waiting for startup")
280 return os.path.exists(furl_file)
281 d = self.poll(_check, 0.1)
282 # once it exists, wait a moment before we read from it, just in case
283 # it hasn't finished writing the whole thing. Ideally control.furl
284 # would be created in some atomic fashion, or made non-readable until
285 # it's ready, but I can't think of an easy way to do that, and I
286 # think the chances that we'll observe a half-write are pretty low.
288 d2 = defer.Deferred()
289 reactor.callLater(0.1, d2.callback, None)
291 d.addCallback(_stall)
293 f = open(furl_file, "r")
300 def kill_client(self):
301 # returns a Deferred that fires when the process exits. This may only
304 self.proc.signalProcess("INT")
305 except error.ProcessExitedAlready:
307 return self.proc_done
310 def create_data(self, name, size):
311 filename = os.path.join(self.testdir, name + ".data")
312 f = open(filename, "wb")
320 def stash_stats(self, stats, name):
321 self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
322 self.statsfile.flush()
323 self.stats[name] = stats['VmPeak']
325 def POST(self, urlpath, **fields):
326 url = self.webish_url + urlpath
327 sepbase = "boogabooga"
331 form.append('Content-Disposition: form-data; name="_charset"')
335 for name, value in fields.iteritems():
336 if isinstance(value, tuple):
337 filename, value = value
338 form.append('Content-Disposition: form-data; name="%s"; '
339 'filename="%s"' % (name, filename))
341 form.append('Content-Disposition: form-data; name="%s"' % name)
346 body = "\r\n".join(form) + "\r\n"
347 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
349 return tw_client.getPage(url, method="POST", postdata=body,
350 headers=headers, followRedirect=False)
352 def GET_discard(self, urlpath, stall):
353 url = self.webish_url + urlpath + "?filename=dummy-get.out"
354 return discardPage(url, stall)
356 def _print_usage(self, res=None):
357 d = self.control_rref.callRemote("get_memory_usage")
359 print "VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
362 d.addCallback(_print)
365 def _do_upload(self, res, size, files, uris):
368 print "uploading %s" % name
369 if self.mode in ("upload", "upload-self"):
370 files[name] = self.create_data(name, size)
371 d = self.control_rref.callRemote("upload_from_file_to_uri",
374 os.remove(files[name])
378 elif self.mode == "upload-POST":
381 d = self.POST(url, t="upload", file=("%d.data" % size, data))
382 elif self.mode in ("receive",
383 "download", "download-GET", "download-GET-slow"):
384 # mode=receive: upload the data from a local peer, so that the
385 # client-under-test receives and stores the shares
387 # mode=download*: upload the data from a local peer, then have
388 # the client-under-test download it.
390 # we need to wait until the uploading node has connected to all
391 # peers, since the wait_for_client_connections() above doesn't
392 # pay attention to our self.nodes[] and their connections.
393 files[name] = self.create_data(name, size)
394 u = self.nodes[0].getServiceNamed("uploader")
395 d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
396 d.addCallback(lambda res: u.upload(upload.FileName(files[name])))
398 raise RuntimeError("unknown mode=%s" % self.mode)
401 print "uploaded %s" % name
402 d.addCallback(_complete)
405 def _do_download(self, res, size, uris):
406 if self.mode not in ("download", "download-GET", "download-GET-slow"):
409 print "downloading %s" % name
412 if self.mode == "download":
413 d = self.control_rref.callRemote("download_from_uri_to_file",
415 elif self.mode == "download-GET":
416 url = "/uri/%s" % uri
417 d = self.GET_discard(urllib.quote(url), stall=False)
418 elif self.mode == "download-GET-slow":
419 url = "/uri/%s" % uri
420 d = self.GET_discard(urllib.quote(url), stall=True)
423 print "downloaded %s" % name
425 d.addCallback(_complete)
429 #print "CLIENT STARTED"
430 #print "FURL", self.control_furl
431 #print "RREF", self.control_rref
433 kB = 1000; MB = 1000*1000
437 d = self._print_usage()
438 d.addCallback(self.stash_stats, "0B")
441 d.addCallback(self._do_upload, 10*kB+i, files, uris)
442 d.addCallback(self._do_download, 10*kB+i, uris)
443 d.addCallback(self._print_usage)
444 d.addCallback(self.stash_stats, "10kB")
447 d.addCallback(self._do_upload, 10*MB+i, files, uris)
448 d.addCallback(self._do_download, 10*MB+i, uris)
449 d.addCallback(self._print_usage)
450 d.addCallback(self.stash_stats, "10MB")
453 d.addCallback(self._do_upload, 50*MB+i, files, uris)
454 d.addCallback(self._do_download, 50*MB+i, uris)
455 d.addCallback(self._print_usage)
456 d.addCallback(self.stash_stats, "50MB")
459 # d.addCallback(self._do_upload, 100*MB+i, files, uris)
460 # d.addCallback(self._do_download, 100*MB+i, uris)
461 # d.addCallback(self._print_usage)
462 #d.addCallback(self.stash_stats, "100MB")
464 #d.addCallback(self.stall)
470 def stall(self, res):
472 reactor.callLater(5, d.callback, None)
476 class ClientWatcher(protocol.ProcessProtocol):
478 def outReceived(self, data):
480 def errReceived(self, data):
482 def processEnded(self, reason):
484 self.d.callback(None)
487 if __name__ == '__main__':
489 if len(sys.argv) > 1:
491 # put the logfile and stats.out in _test_memory/ . These stick around.
492 # put the nodes and other files in _test_memory/test/ . These are
493 # removed each time we run.
494 sf = SystemFramework("_test_memory", mode)