3 import os, shutil, sys, urllib, time, stat
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web import client as tw_client
8 from allmydata import client, introducer
9 from allmydata.immutable import upload
10 from allmydata.scripts import create_node
11 from allmydata.util import fileutil, pollmixin
12 from foolscap.api import Tub, fireEventually, flushEventualQueue
13 from twisted.python import log
15 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
16 full_speed_ahead = False
19 def handleResponsePart(self, data):
20 self._bytes_so_far += len(data)
21 if not self.factory.do_stall:
23 if self.full_speed_ahead:
25 if self._bytes_so_far > 1e6+100:
28 self.transport.pauseProducing()
29 self.stalled = reactor.callLater(10.0, self._resume_speed)
30 def _resume_speed(self):
33 self.full_speed_ahead = True
34 self.transport.resumeProducing()
35 def handleResponseEnd(self):
40 return tw_client.HTTPPageGetter.handleResponseEnd(self)
42 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
43 protocol = StallableHTTPGetterDiscarder
45 def discardPage(url, stall=False, *args, **kwargs):
46 """Start fetching the URL, but stall our pipe after the first 1MB.
47 Wait 10 seconds, then resume downloading (and discarding) everything.
49 # adapted from twisted.web.client.getPage . We can't just wrap or
50 # subclass because it provides no way to override the HTTPClientFactory
52 scheme, host, port, path = tw_client._parse(url)
53 factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
54 factory.do_stall = stall
55 assert scheme == 'http'
56 reactor.connectTCP(host, port, factory)
57 return factory.deferred
59 class ChildDidNotStartError(Exception):
62 class SystemFramework(pollmixin.PollMixin):
65 def __init__(self, basedir, mode):
66 self.basedir = basedir = os.path.abspath(basedir)
67 if not basedir.startswith(os.path.abspath(".")):
68 raise AssertionError("safety issue: basedir must be a subdir")
69 self.testdir = testdir = os.path.join(basedir, "test")
70 if os.path.exists(testdir):
71 shutil.rmtree(testdir)
72 fileutil.make_dirs(testdir)
73 self.sparent = service.MultiService()
74 self.sparent.startService()
77 self.tub.setOption("expose-remote-exception-types", False)
78 self.tub.setServiceParent(self.sparent)
81 self.keepalive_file = None
84 framelog = os.path.join(self.basedir, "driver.log")
85 log.startLogging(open(framelog, "a"), setStdout=False)
86 log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
87 #logfile = open(os.path.join(self.testdir, "log"), "w")
88 #flo = log.FileLogObserver(logfile)
89 #log.startLoggingWithObserver(flo.emit, setStdout=False)
91 d.addCallback(lambda res: self.setUp())
92 d.addCallback(lambda res: self.record_initial_memusage())
93 d.addCallback(lambda res: self.make_nodes())
94 d.addCallback(lambda res: self.wait_for_client_connected())
95 d.addCallback(lambda res: self.do_test())
96 d.addBoth(self.tearDown)
108 # raiseException doesn't work for CopiedFailures
109 self.failed.raiseException()
114 self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
115 d = self.make_introducer()
117 return self.start_client()
119 def _record_control_furl(control_furl):
120 self.control_furl = control_furl
121 #print "OBTAINING '%s'" % (control_furl,)
122 return self.tub.getReference(self.control_furl)
123 d.addCallback(_record_control_furl)
124 def _record_control(control_rref):
125 self.control_rref = control_rref
126 d.addCallback(_record_control)
128 #print "CLIENT READY"
130 d.addCallback(_ready)
133 def record_initial_memusage(self):
135 print "Client started (no connections yet)"
136 d = self._print_usage()
137 d.addCallback(self.stash_stats, "init")
140 def wait_for_client_connected(self):
142 print "Client connecting to other nodes.."
143 return self.control_rref.callRemote("wait_for_client_connections",
146 def tearDown(self, passthrough):
147 # the client node will shut down in a few seconds
148 #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
149 log.msg("shutting down SystemTest services")
150 if self.keepalive_file and os.path.exists(self.keepalive_file):
151 age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
152 log.msg("keepalive file at shutdown was %ds old" % age)
153 d = defer.succeed(None)
155 d.addCallback(lambda res: self.kill_client())
156 d.addCallback(lambda res: self.sparent.stopService())
157 d.addCallback(lambda res: flushEventualQueue())
158 def _close_statsfile(res):
159 self.statsfile.close()
160 d.addCallback(_close_statsfile)
161 d.addCallback(lambda res: passthrough)
164 def add_service(self, s):
165 s.setServiceParent(self.sparent)
168 def make_introducer(self):
169 iv_basedir = os.path.join(self.testdir, "introducer")
171 iv = introducer.IntroducerNode(basedir=iv_basedir)
172 self.introducer = self.add_service(iv)
173 d = self.introducer.when_tub_ready()
174 def _introducer_ready(res):
176 self.introducer_furl = q.introducer_url
177 d.addCallback(_introducer_ready)
180 def make_nodes(self):
182 for i in range(self.numnodes):
183 nodedir = os.path.join(self.testdir, "node%d" % i)
185 f = open(os.path.join(nodedir, "introducer.furl"), "w")
186 f.write(self.introducer_furl)
188 # the only tests for which we want the internal nodes to actually
189 # retain shares are the ones where somebody's going to download
191 if self.mode in ("download", "download-GET", "download-GET-slow"):
195 # for these tests, we tell the storage servers to pretend to
196 # accept shares, but really just throw them out, since we're
197 # only testing upload and not download.
198 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
199 f.write("no_storage\n")
201 if self.mode in ("receive",):
202 # for this mode, the client-under-test gets all the shares,
203 # so our internal nodes can refuse requests
204 f = open(os.path.join(nodedir, "readonly_storage"), "w")
207 c = self.add_service(client.Client(basedir=nodedir))
209 # the peers will start running, eventually they will connect to each
210 # other and the introducer
212 def touch_keepalive(self):
213 if os.path.exists(self.keepalive_file):
214 age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
215 log.msg("touching keepalive file, was %ds old" % age)
216 f = open(self.keepalive_file, "w")
218 If the node notices this file at startup, it will poll every 5 seconds and
219 terminate if the file is more than 10 seconds old, or if it has been deleted.
220 If the test harness has an internal failure and neglects to kill off the node
221 itself, this helps to avoid leaving processes lying around. The contents of
222 this file are ignored.
226 def start_client(self):
227 # this returns a Deferred that fires with the client's control.furl
228 log.msg("MAKING CLIENT")
229 clientdir = self.clientdir = os.path.join(self.testdir, "client")
231 create_node.create_node(clientdir, {}, out=quiet)
232 log.msg("DONE MAKING CLIENT")
233 f = open(os.path.join(clientdir, "introducer.furl"), "w")
234 f.write(self.introducer_furl + "\n")
237 # set webport=0 and then ask the node what port it picked.
238 f = open(os.path.join(clientdir, "webport"), "w")
239 f.write("tcp:0:interface=127.0.0.1\n")
242 if self.mode in ("upload-self", "receive"):
243 # accept and store shares, to trigger the memory consumption bugs
246 # don't accept any shares
247 f = open(os.path.join(clientdir, "readonly_storage"), "w")
250 ## also, if we do receive any shares, throw them away
251 #f = open(os.path.join(clientdir, "debug_no_storage"), "w")
252 #f.write("no_storage\n")
254 if self.mode == "upload-self":
256 self.keepalive_file = os.path.join(clientdir,
257 "suicide_prevention_hotline")
258 # now start updating the mtime.
259 self.touch_keepalive()
260 ts = internet.TimerService(1.0, self.touch_keepalive)
261 ts.setServiceParent(self.sparent)
264 self.proc_done = pp.d = defer.Deferred()
265 logfile = os.path.join(self.basedir, "client.log")
266 cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
267 env = os.environ.copy()
268 self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
269 log.msg("CLIENT STARTED")
271 # now we wait for the client to get started. we're looking for the
272 # control.furl file to appear.
273 furl_file = os.path.join(clientdir, "private", "control.furl")
274 url_file = os.path.join(clientdir, "node.url")
276 if pp.ended and pp.ended.value.status != 0:
277 # the twistd process ends normally (with rc=0) if the child
278 # is successfully launched. It ends abnormally (with rc!=0)
279 # if the child cannot be launched.
280 raise ChildDidNotStartError("process ended while waiting for startup")
281 return os.path.exists(furl_file)
282 d = self.poll(_check, 0.1)
283 # once it exists, wait a moment before we read from it, just in case
284 # it hasn't finished writing the whole thing. Ideally control.furl
285 # would be created in some atomic fashion, or made non-readable until
286 # it's ready, but I can't think of an easy way to do that, and I
287 # think the chances that we'll observe a half-write are pretty low.
289 d2 = defer.Deferred()
290 reactor.callLater(0.1, d2.callback, None)
292 d.addCallback(_stall)
294 # read the node's URL
295 self.webish_url = open(url_file, "r").read().strip()
296 if self.webish_url[-1] == "/":
297 # trim trailing slash, since the rest of the code wants it gone
298 self.webish_url = self.webish_url[:-1]
299 f = open(furl_file, "r")
306 def kill_client(self):
307 # returns a Deferred that fires when the process exits. This may only
310 self.proc.signalProcess("INT")
311 except error.ProcessExitedAlready:
313 return self.proc_done
316 def create_data(self, name, size):
317 filename = os.path.join(self.testdir, name + ".data")
318 f = open(filename, "wb")
326 def stash_stats(self, stats, name):
327 self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
328 self.statsfile.flush()
329 self.stats[name] = stats['VmPeak']
331 def POST(self, urlpath, **fields):
332 url = self.webish_url + urlpath
333 sepbase = "boogabooga"
337 form.append('Content-Disposition: form-data; name="_charset"')
341 for name, value in fields.iteritems():
342 if isinstance(value, tuple):
343 filename, value = value
344 form.append('Content-Disposition: form-data; name="%s"; '
345 'filename="%s"' % (name, filename))
347 form.append('Content-Disposition: form-data; name="%s"' % name)
352 body = "\r\n".join(form) + "\r\n"
353 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
355 return tw_client.getPage(url, method="POST", postdata=body,
356 headers=headers, followRedirect=False)
358 def GET_discard(self, urlpath, stall):
359 url = self.webish_url + urlpath + "?filename=dummy-get.out"
360 return discardPage(url, stall)
362 def _print_usage(self, res=None):
363 d = self.control_rref.callRemote("get_memory_usage")
365 print "VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
368 d.addCallback(_print)
371 def _do_upload(self, res, size, files, uris):
374 print "uploading %s" % name
375 if self.mode in ("upload", "upload-self"):
376 files[name] = self.create_data(name, size)
377 d = self.control_rref.callRemote("upload_from_file_to_uri",
378 files[name], convergence="check-memory convergence string")
380 os.remove(files[name])
384 elif self.mode == "upload-POST":
387 d = self.POST(url, t="upload", file=("%d.data" % size, data))
388 elif self.mode in ("receive",
389 "download", "download-GET", "download-GET-slow"):
390 # mode=receive: upload the data from a local peer, so that the
391 # client-under-test receives and stores the shares
393 # mode=download*: upload the data from a local peer, then have
394 # the client-under-test download it.
396 # we need to wait until the uploading node has connected to all
397 # peers, since the wait_for_client_connections() above doesn't
398 # pay attention to our self.nodes[] and their connections.
399 files[name] = self.create_data(name, size)
400 u = self.nodes[0].getServiceNamed("uploader")
401 d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
402 d.addCallback(lambda res: u.upload(upload.FileName(files[name], convergence="check-memory convergence string")))
403 d.addCallback(lambda results: results.uri)
405 raise ValueError("unknown mode=%s" % self.mode)
408 print "uploaded %s" % name
409 d.addCallback(_complete)
412 def _do_download(self, res, size, uris):
413 if self.mode not in ("download", "download-GET", "download-GET-slow"):
416 print "downloading %s" % name
419 if self.mode == "download":
420 d = self.control_rref.callRemote("download_from_uri_to_file",
422 elif self.mode == "download-GET":
423 url = "/uri/%s" % uri
424 d = self.GET_discard(urllib.quote(url), stall=False)
425 elif self.mode == "download-GET-slow":
426 url = "/uri/%s" % uri
427 d = self.GET_discard(urllib.quote(url), stall=True)
430 print "downloaded %s" % name
432 d.addCallback(_complete)
436 #print "CLIENT STARTED"
437 #print "FURL", self.control_furl
438 #print "RREF", self.control_rref
440 kB = 1000; MB = 1000*1000
444 d = self._print_usage()
445 d.addCallback(self.stash_stats, "0B")
448 d.addCallback(self._do_upload, 10*kB+i, files, uris)
449 d.addCallback(self._do_download, 10*kB+i, uris)
450 d.addCallback(self._print_usage)
451 d.addCallback(self.stash_stats, "10kB")
454 d.addCallback(self._do_upload, 10*MB+i, files, uris)
455 d.addCallback(self._do_download, 10*MB+i, uris)
456 d.addCallback(self._print_usage)
457 d.addCallback(self.stash_stats, "10MB")
460 d.addCallback(self._do_upload, 50*MB+i, files, uris)
461 d.addCallback(self._do_download, 50*MB+i, uris)
462 d.addCallback(self._print_usage)
463 d.addCallback(self.stash_stats, "50MB")
466 # d.addCallback(self._do_upload, 100*MB+i, files, uris)
467 # d.addCallback(self._do_download, 100*MB+i, uris)
468 # d.addCallback(self._print_usage)
469 #d.addCallback(self.stash_stats, "100MB")
471 #d.addCallback(self.stall)
477 def stall(self, res):
479 reactor.callLater(5, d.callback, None)
483 class ClientWatcher(protocol.ProcessProtocol):
485 def outReceived(self, data):
487 def errReceived(self, data):
489 def processEnded(self, reason):
491 self.d.callback(None)
494 if __name__ == '__main__':
496 if len(sys.argv) > 1:
498 # put the logfile and stats.out in _test_memory/ . These stick around.
499 # put the nodes and other files in _test_memory/test/ . These are
500 # removed each time we run.
501 sf = SystemFramework("_test_memory", mode)