3 import os, shutil, sys, urllib, time, stat
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web import client as tw_client
8 from allmydata import client, introducer, upload
9 from allmydata.scripts import create_node
10 from allmydata.util import testutil, fileutil
12 from foolscap import eventual
13 from twisted.python import log
15 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
16 full_speed_ahead = False
19 def handleResponsePart(self, data):
20 self._bytes_so_far += len(data)
21 if not self.factory.do_stall:
23 if self.full_speed_ahead:
25 if self._bytes_so_far > 1e6+100:
28 self.transport.pauseProducing()
29 self.stalled = reactor.callLater(10.0, self._resume_speed)
30 def _resume_speed(self):
33 self.full_speed_ahead = True
34 self.transport.resumeProducing()
35 def handleResponseEnd(self):
40 return tw_client.HTTPPageGetter.handleResponseEnd(self)
42 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
43 protocol = StallableHTTPGetterDiscarder
45 def discardPage(url, stall=False, *args, **kwargs):
46 """Start fetching the URL, but stall our pipe after the first 1MB.
47 Wait 10 seconds, then resume downloading (and discarding) everything.
49 # adapted from twisted.web.client.getPage . We can't just wrap or
50 # subclass because it provides no way to override the HTTPClientFactory
52 scheme, host, port, path = tw_client._parse(url)
53 factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
54 factory.do_stall = stall
55 assert scheme == 'http'
56 reactor.connectTCP(host, port, factory)
57 return factory.deferred
59 class SystemFramework(testutil.PollMixin):
62 def __init__(self, basedir, mode):
63 self.basedir = basedir = os.path.abspath(basedir)
64 if not basedir.startswith(os.path.abspath(".")):
65 raise AssertionError("safety issue: basedir must be a subdir")
66 self.testdir = testdir = os.path.join(basedir, "test")
67 if os.path.exists(testdir):
68 shutil.rmtree(testdir)
69 fileutil.make_dirs(testdir)
70 self.sparent = service.MultiService()
71 self.sparent.startService()
73 self.tub = foolscap.Tub()
74 self.tub.setServiceParent(self.sparent)
77 self.keepalive_file = None
80 framelog = os.path.join(self.basedir, "driver.log")
81 log.startLogging(open(framelog, "a"), setStdout=False)
82 log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
83 #logfile = open(os.path.join(self.testdir, "log"), "w")
84 #flo = log.FileLogObserver(logfile)
85 #log.startLoggingWithObserver(flo.emit, setStdout=False)
86 d = eventual.fireEventually()
87 d.addCallback(lambda res: self.setUp())
88 d.addCallback(lambda res: self.record_initial_memusage())
89 d.addCallback(lambda res: self.make_nodes())
90 d.addCallback(lambda res: self.wait_for_client_connected())
91 d.addCallback(lambda res: self.do_test())
92 d.addBoth(self.tearDown)
104 # raiseException doesn't work for CopiedFailures
105 self.failed.raiseException()
110 self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
111 d = self.make_introducer()
113 return self.start_client()
115 def _record_control_furl(control_furl):
116 self.control_furl = control_furl
117 #print "OBTAINING '%s'" % (control_furl,)
118 return self.tub.getReference(self.control_furl)
119 d.addCallback(_record_control_furl)
120 def _record_control(control_rref):
121 self.control_rref = control_rref
122 d.addCallback(_record_control)
124 #print "CLIENT READY"
126 d.addCallback(_ready)
129 def record_initial_memusage(self):
131 print "Client started (no connections yet)"
132 d = self._print_usage()
133 d.addCallback(self.stash_stats, "init")
136 def wait_for_client_connected(self):
138 print "Client connecting to other nodes.."
139 return self.control_rref.callRemote("wait_for_client_connections",
142 def tearDown(self, passthrough):
143 # the client node will shut down in a few seconds
144 #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
145 log.msg("shutting down SystemTest services")
146 if self.keepalive_file and os.path.exists(self.keepalive_file):
147 age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
148 log.msg("keepalive file at shutdown was %ds old" % age)
149 d = defer.succeed(None)
151 d.addCallback(lambda res: self.kill_client())
152 d.addCallback(lambda res: self.sparent.stopService())
153 d.addCallback(lambda res: eventual.flushEventualQueue())
154 def _close_statsfile(res):
155 self.statsfile.close()
156 d.addCallback(_close_statsfile)
157 d.addCallback(lambda res: passthrough)
160 def add_service(self, s):
161 s.setServiceParent(self.sparent)
164 def make_introducer(self):
165 iv_basedir = os.path.join(self.testdir, "introducer")
167 iv = introducer.IntroducerNode(basedir=iv_basedir)
168 self.introducer = self.add_service(iv)
169 d = self.introducer.when_tub_ready()
170 def _introducer_ready(res):
172 self.introducer_furl = q.introducer_url
173 d.addCallback(_introducer_ready)
176 def make_nodes(self):
178 for i in range(self.numnodes):
179 nodedir = os.path.join(self.testdir, "node%d" % i)
181 f = open(os.path.join(nodedir, "introducer.furl"), "w")
182 f.write(self.introducer_furl)
184 # the only tests for which we want the internal nodes to actually
185 # retain shares are the ones where somebody's going to download
187 if self.mode in ("download", "download-GET", "download-GET-slow"):
191 # for these tests, we tell the storage servers to pretend to
192 # accept shares, but really just throw them out, since we're
193 # only testing upload and not download.
194 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
195 f.write("no_storage\n")
197 if self.mode in ("receive",):
198 # for this mode, the client-under-test gets all the shares,
199 # so our internal nodes can refuse requests
200 f = open(os.path.join(nodedir, "sizelimit"), "w")
203 c = self.add_service(client.Client(basedir=nodedir))
205 # the peers will start running, eventually they will connect to each
206 # other and the introducer
208 def touch_keepalive(self):
209 if os.path.exists(self.keepalive_file):
210 age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
211 log.msg("touching keepalive file, was %ds old" % age)
212 f = open(self.keepalive_file, "w")
214 If the node notices this file at startup, it will poll every 5 seconds and
215 terminate if the file is more than 10 seconds old, or if it has been deleted.
216 If the test harness has an internal failure and neglects to kill off the node
217 itself, this helps to avoid leaving processes lying around. The contents of
218 this file are ignored.
222 def start_client(self):
223 # this returns a Deferred that fires with the client's control.furl
224 log.msg("MAKING CLIENT")
225 clientdir = self.clientdir = os.path.join(self.testdir, "client")
227 create_node.create_client(clientdir, {}, out=quiet)
228 log.msg("DONE MAKING CLIENT")
229 f = open(os.path.join(clientdir, "introducer.furl"), "w")
230 f.write(self.introducer_furl + "\n")
232 f = open(os.path.join(clientdir, "webport"), "w")
233 # TODO: ideally we would set webport=0 and then ask the node what
234 # port it picked. But at the moment it is not convenient to do this,
235 # so we just pick a relatively unique one.
236 webport = max(os.getpid(), 2000)
237 f.write("tcp:%d:interface=127.0.0.1\n" % webport)
239 self.webish_url = "http://localhost:%d" % webport
240 if self.mode in ("upload-self", "receive"):
241 # accept and store shares, to trigger the memory consumption bugs
244 # don't accept any shares
245 f = open(os.path.join(clientdir, "readonly_storage"), "w")
248 ## also, if we do receive any shares, throw them away
249 #f = open(os.path.join(clientdir, "debug_no_storage"), "w")
250 #f.write("no_storage\n")
252 if self.mode == "upload-self":
254 self.keepalive_file = os.path.join(clientdir,
255 "suicide_prevention_hotline")
256 # now start updating the mtime.
257 self.touch_keepalive()
258 ts = internet.TimerService(1.0, self.touch_keepalive)
259 ts.setServiceParent(self.sparent)
262 self.proc_done = pp.d = defer.Deferred()
263 logfile = os.path.join(self.basedir, "client.log")
264 cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
265 env = os.environ.copy()
266 self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
267 log.msg("CLIENT STARTED")
269 # now we wait for the client to get started. we're looking for the
270 # control.furl file to appear.
271 furl_file = os.path.join(clientdir, "private", "control.furl")
273 if pp.ended and pp.ended.value.status != 0:
274 # the twistd process ends normally (with rc=0) if the child
275 # is successfully launched. It ends abnormally (with rc!=0)
276 # if the child cannot be launched.
277 raise RuntimeError("process ended while waiting for startup")
278 return os.path.exists(furl_file)
279 d = self.poll(_check, 0.1)
280 # once it exists, wait a moment before we read from it, just in case
281 # it hasn't finished writing the whole thing. Ideally control.furl
282 # would be created in some atomic fashion, or made non-readable until
283 # it's ready, but I can't think of an easy way to do that, and I
284 # think the chances that we'll observe a half-write are pretty low.
286 d2 = defer.Deferred()
287 reactor.callLater(0.1, d2.callback, None)
289 d.addCallback(_stall)
291 f = open(furl_file, "r")
298 def kill_client(self):
299 # returns a Deferred that fires when the process exits. This may only
302 self.proc.signalProcess("INT")
303 except error.ProcessExitedAlready:
305 return self.proc_done
308 def create_data(self, name, size):
309 filename = os.path.join(self.testdir, name + ".data")
310 f = open(filename, "wb")
318 def stash_stats(self, stats, name):
319 self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
320 self.statsfile.flush()
321 self.stats[name] = stats['VmPeak']
323 def POST(self, urlpath, **fields):
324 url = self.webish_url + urlpath
325 sepbase = "boogabooga"
329 form.append('Content-Disposition: form-data; name="_charset"')
333 for name, value in fields.iteritems():
334 if isinstance(value, tuple):
335 filename, value = value
336 form.append('Content-Disposition: form-data; name="%s"; '
337 'filename="%s"' % (name, filename))
339 form.append('Content-Disposition: form-data; name="%s"' % name)
344 body = "\r\n".join(form) + "\r\n"
345 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
347 return tw_client.getPage(url, method="POST", postdata=body,
348 headers=headers, followRedirect=False)
350 def GET_discard(self, urlpath, stall):
351 url = self.webish_url + urlpath + "?filename=dummy-get.out"
352 return discardPage(url, stall)
354 def _print_usage(self, res=None):
355 d = self.control_rref.callRemote("get_memory_usage")
357 print "VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
360 d.addCallback(_print)
363 def _do_upload(self, res, size, files, uris):
366 print "uploading %s" % name
367 if self.mode in ("upload", "upload-self"):
368 files[name] = self.create_data(name, size)
369 d = self.control_rref.callRemote("upload_from_file_to_uri",
372 os.remove(files[name])
376 elif self.mode == "upload-POST":
379 d = self.POST(url, t="upload", file=("%d.data" % size, data))
380 elif self.mode in ("receive",
381 "download", "download-GET", "download-GET-slow"):
382 # mode=receive: upload the data from a local peer, so that the
383 # client-under-test receives and stores the shares
385 # mode=download*: upload the data from a local peer, then have
386 # the client-under-test download it.
388 # we need to wait until the uploading node has connected to all
389 # peers, since the wait_for_client_connections() above doesn't
390 # pay attention to our self.nodes[] and their connections.
391 files[name] = self.create_data(name, size)
392 u = self.nodes[0].getServiceNamed("uploader")
393 d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
394 d.addCallback(lambda res: u.upload(upload.FileName(files[name])))
396 raise RuntimeError("unknown mode=%s" % self.mode)
399 print "uploaded %s" % name
400 d.addCallback(_complete)
403 def _do_download(self, res, size, uris):
404 if self.mode not in ("download", "download-GET", "download-GET-slow"):
407 print "downloading %s" % name
410 if self.mode == "download":
411 d = self.control_rref.callRemote("download_from_uri_to_file",
413 elif self.mode == "download-GET":
414 url = "/uri/%s" % uri
415 d = self.GET_discard(urllib.quote(url), stall=False)
416 elif self.mode == "download-GET-slow":
417 url = "/uri/%s" % uri
418 d = self.GET_discard(urllib.quote(url), stall=True)
421 print "downloaded %s" % name
423 d.addCallback(_complete)
427 #print "CLIENT STARTED"
428 #print "FURL", self.control_furl
429 #print "RREF", self.control_rref
431 kB = 1000; MB = 1000*1000
435 d = self._print_usage()
436 d.addCallback(self.stash_stats, "0B")
439 d.addCallback(self._do_upload, 10*kB+i, files, uris)
440 d.addCallback(self._do_download, 10*kB+i, uris)
441 d.addCallback(self._print_usage)
442 d.addCallback(self.stash_stats, "10kB")
445 d.addCallback(self._do_upload, 10*MB+i, files, uris)
446 d.addCallback(self._do_download, 10*MB+i, uris)
447 d.addCallback(self._print_usage)
448 d.addCallback(self.stash_stats, "10MB")
451 d.addCallback(self._do_upload, 50*MB+i, files, uris)
452 d.addCallback(self._do_download, 50*MB+i, uris)
453 d.addCallback(self._print_usage)
454 d.addCallback(self.stash_stats, "50MB")
457 # d.addCallback(self._do_upload, 100*MB+i, files, uris)
458 # d.addCallback(self._do_download, 100*MB+i, uris)
459 # d.addCallback(self._print_usage)
460 #d.addCallback(self.stash_stats, "100MB")
462 #d.addCallback(self.stall)
468 def stall(self, res):
470 reactor.callLater(5, d.callback, None)
474 class ClientWatcher(protocol.ProcessProtocol):
476 def outReceived(self, data):
478 def errReceived(self, data):
480 def processEnded(self, reason):
482 self.d.callback(None)
485 if __name__ == '__main__':
487 if len(sys.argv) > 1:
489 # put the logfile and stats.out in _test_memory/ . These stick around.
490 # put the nodes and other files in _test_memory/test/ . These are
491 # removed each time we run.
492 sf = SystemFramework("_test_memory", mode)