1 import os, shutil, sys, urllib, time, stat
2 from cStringIO import StringIO
3 from twisted.internet import defer, reactor, protocol, error
4 from twisted.application import service, internet
5 from twisted.web import client as tw_client
6 from allmydata import client, introducer
7 from allmydata.immutable import upload
8 from allmydata.scripts import create_node
9 from allmydata.util import fileutil, pollmixin
10 from allmydata.util.fileutil import abspath_expanduser_unicode
11 from allmydata.util.encodingutil import get_filesystem_encoding
12 from foolscap.api import Tub, fireEventually, flushEventualQueue
13 from twisted.python import log
15 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
16 full_speed_ahead = False
19 def handleResponsePart(self, data):
20 self._bytes_so_far += len(data)
21 if not self.factory.do_stall:
23 if self.full_speed_ahead:
25 if self._bytes_so_far > 1e6+100:
28 self.transport.pauseProducing()
29 self.stalled = reactor.callLater(10.0, self._resume_speed)
30 def _resume_speed(self):
33 self.full_speed_ahead = True
34 self.transport.resumeProducing()
35 def handleResponseEnd(self):
40 return tw_client.HTTPPageGetter.handleResponseEnd(self)
42 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
43 protocol = StallableHTTPGetterDiscarder
45 def discardPage(url, stall=False, *args, **kwargs):
46 """Start fetching the URL, but stall our pipe after the first 1MB.
47 Wait 10 seconds, then resume downloading (and discarding) everything.
49 # adapted from twisted.web.client.getPage . We can't just wrap or
50 # subclass because it provides no way to override the HTTPClientFactory
52 scheme, host, port, path = tw_client._parse(url)
53 factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
54 factory.do_stall = stall
55 assert scheme == 'http'
56 reactor.connectTCP(host, port, factory)
57 return factory.deferred
59 class ChildDidNotStartError(Exception):
62 class SystemFramework(pollmixin.PollMixin):
65 def __init__(self, basedir, mode):
66 self.basedir = basedir = abspath_expanduser_unicode(unicode(basedir))
67 if not (basedir + os.path.sep).startswith(abspath_expanduser_unicode(u".") + os.path.sep):
68 raise AssertionError("safety issue: basedir must be a subdir")
69 self.testdir = testdir = os.path.join(basedir, "test")
70 if os.path.exists(testdir):
71 shutil.rmtree(testdir)
72 fileutil.make_dirs(testdir)
73 self.sparent = service.MultiService()
74 self.sparent.startService()
77 self.tub.setOption("expose-remote-exception-types", False)
78 self.tub.setServiceParent(self.sparent)
81 self.keepalive_file = None
84 framelog = os.path.join(self.basedir, "driver.log")
85 log.startLogging(open(framelog, "a"), setStdout=False)
86 log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
87 #logfile = open(os.path.join(self.testdir, "log"), "w")
88 #flo = log.FileLogObserver(logfile)
89 #log.startLoggingWithObserver(flo.emit, setStdout=False)
91 d.addCallback(lambda res: self.setUp())
92 d.addCallback(lambda res: self.record_initial_memusage())
93 d.addCallback(lambda res: self.make_nodes())
94 d.addCallback(lambda res: self.wait_for_client_connected())
95 d.addCallback(lambda res: self.do_test())
96 d.addBoth(self.tearDown)
108 # raiseException doesn't work for CopiedFailures
109 self.failed.raiseException()
114 self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
115 d = self.make_introducer()
117 return self.start_client()
119 def _record_control_furl(control_furl):
120 self.control_furl = control_furl
121 #print "OBTAINING '%s'" % (control_furl,)
122 return self.tub.getReference(self.control_furl)
123 d.addCallback(_record_control_furl)
124 def _record_control(control_rref):
125 self.control_rref = control_rref
126 d.addCallback(_record_control)
128 #print "CLIENT READY"
130 d.addCallback(_ready)
133 def record_initial_memusage(self):
135 print "Client started (no connections yet)"
136 d = self._print_usage()
137 d.addCallback(self.stash_stats, "init")
140 def wait_for_client_connected(self):
142 print "Client connecting to other nodes.."
143 return self.control_rref.callRemote("wait_for_client_connections",
146 def tearDown(self, passthrough):
147 # the client node will shut down in a few seconds
148 #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
149 log.msg("shutting down SystemTest services")
150 if self.keepalive_file and os.path.exists(self.keepalive_file):
151 age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
152 log.msg("keepalive file at shutdown was %ds old" % age)
153 d = defer.succeed(None)
155 d.addCallback(lambda res: self.kill_client())
156 d.addCallback(lambda res: self.sparent.stopService())
157 d.addCallback(lambda res: flushEventualQueue())
158 def _close_statsfile(res):
159 self.statsfile.close()
160 d.addCallback(_close_statsfile)
161 d.addCallback(lambda res: passthrough)
164 def add_service(self, s):
165 s.setServiceParent(self.sparent)
168 def make_introducer(self):
169 iv_basedir = os.path.join(self.testdir, "introducer")
171 iv = introducer.IntroducerNode(basedir=iv_basedir)
172 self.introducer = self.add_service(iv)
173 d = self.introducer.when_tub_ready()
174 def _introducer_ready(res):
176 self.introducer_furl = q.introducer_url
177 d.addCallback(_introducer_ready)
180 def make_nodes(self):
182 for i in range(self.numnodes):
183 nodedir = os.path.join(self.testdir, "node%d" % i)
185 f = open(os.path.join(nodedir, "tahoe.cfg"), "w")
187 "introducer.furl = %s\n"
190 % (self.introducer_furl,))
191 # the only tests for which we want the internal nodes to actually
192 # retain shares are the ones where somebody's going to download
194 if self.mode in ("download", "download-GET", "download-GET-slow"):
198 # for these tests, we tell the storage servers to pretend to
199 # accept shares, but really just throw them out, since we're
200 # only testing upload and not download.
201 f.write("debug_discard = true\n")
202 if self.mode in ("receive",):
203 # for this mode, the client-under-test gets all the shares,
204 # so our internal nodes can refuse requests
205 f.write("readonly = true\n")
207 c = self.add_service(client.Client(basedir=nodedir))
209 # the peers will start running, eventually they will connect to each
210 # other and the introducer
212 def touch_keepalive(self):
213 if os.path.exists(self.keepalive_file):
214 age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
215 log.msg("touching keepalive file, was %ds old" % age)
216 f = open(self.keepalive_file, "w")
218 If the node notices this file at startup, it will poll every 5 seconds and
219 terminate if the file is more than 10 seconds old, or if it has been deleted.
220 If the test harness has an internal failure and neglects to kill off the node
221 itself, this helps to avoid leaving processes lying around. The contents of
222 this file are ignored.
226 def start_client(self):
227 # this returns a Deferred that fires with the client's control.furl
228 log.msg("MAKING CLIENT")
229 # self.testdir is an absolute Unicode path
230 clientdir = self.clientdir = os.path.join(self.testdir, u"client")
231 clientdir_str = clientdir.encode(get_filesystem_encoding())
233 create_node.create_node({'basedir': clientdir}, out=quiet)
234 log.msg("DONE MAKING CLIENT")
235 # now replace tahoe.cfg
236 # set webport=0 and then ask the node what port it picked.
237 f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
239 "web.port = tcp:0:interface=127.0.0.1\n"
241 "introducer.furl = %s\n"
244 % (self.introducer_furl,))
246 if self.mode in ("upload-self", "receive"):
247 # accept and store shares, to trigger the memory consumption bugs
250 # don't accept any shares
251 f.write("readonly = true\n")
252 ## also, if we do receive any shares, throw them away
253 #f.write("debug_discard = true")
254 if self.mode == "upload-self":
257 self.keepalive_file = os.path.join(clientdir,
258 "suicide_prevention_hotline")
259 # now start updating the mtime.
260 self.touch_keepalive()
261 ts = internet.TimerService(1.0, self.touch_keepalive)
262 ts.setServiceParent(self.sparent)
265 self.proc_done = pp.d = defer.Deferred()
266 logfile = os.path.join(self.basedir, "client.log")
267 cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
268 env = os.environ.copy()
269 self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir_str)
270 log.msg("CLIENT STARTED")
272 # now we wait for the client to get started. we're looking for the
273 # control.furl file to appear.
274 furl_file = os.path.join(clientdir, "private", "control.furl")
275 url_file = os.path.join(clientdir, "node.url")
277 if pp.ended and pp.ended.value.status != 0:
278 # the twistd process ends normally (with rc=0) if the child
279 # is successfully launched. It ends abnormally (with rc!=0)
280 # if the child cannot be launched.
281 raise ChildDidNotStartError("process ended while waiting for startup")
282 return os.path.exists(furl_file)
283 d = self.poll(_check, 0.1)
284 # once it exists, wait a moment before we read from it, just in case
285 # it hasn't finished writing the whole thing. Ideally control.furl
286 # would be created in some atomic fashion, or made non-readable until
287 # it's ready, but I can't think of an easy way to do that, and I
288 # think the chances that we'll observe a half-write are pretty low.
290 d2 = defer.Deferred()
291 reactor.callLater(0.1, d2.callback, None)
293 d.addCallback(_stall)
295 # read the node's URL
296 self.webish_url = open(url_file, "r").read().strip()
297 if self.webish_url[-1] == "/":
298 # trim trailing slash, since the rest of the code wants it gone
299 self.webish_url = self.webish_url[:-1]
300 f = open(furl_file, "r")
307 def kill_client(self):
308 # returns a Deferred that fires when the process exits. This may only
311 self.proc.signalProcess("INT")
312 except error.ProcessExitedAlready:
314 return self.proc_done
317 def create_data(self, name, size):
318 filename = os.path.join(self.testdir, name + ".data")
319 f = open(filename, "wb")
327 def stash_stats(self, stats, name):
328 self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
329 self.statsfile.flush()
330 self.stats[name] = stats['VmPeak']
332 def POST(self, urlpath, **fields):
333 url = self.webish_url + urlpath
334 sepbase = "boogabooga"
338 form.append('Content-Disposition: form-data; name="_charset"')
342 for name, value in fields.iteritems():
343 if isinstance(value, tuple):
344 filename, value = value
345 form.append('Content-Disposition: form-data; name="%s"; '
346 'filename="%s"' % (name, filename))
348 form.append('Content-Disposition: form-data; name="%s"' % name)
353 body = "\r\n".join(form) + "\r\n"
354 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
356 return tw_client.getPage(url, method="POST", postdata=body,
357 headers=headers, followRedirect=False)
359 def GET_discard(self, urlpath, stall):
360 url = self.webish_url + urlpath + "?filename=dummy-get.out"
361 return discardPage(url, stall)
363 def _print_usage(self, res=None):
364 d = self.control_rref.callRemote("get_memory_usage")
366 print "VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
369 d.addCallback(_print)
372 def _do_upload(self, res, size, files, uris):
375 print "uploading %s" % name
376 if self.mode in ("upload", "upload-self"):
377 files[name] = self.create_data(name, size)
378 d = self.control_rref.callRemote("upload_from_file_to_uri",
379 files[name].encode("utf-8"),
380 convergence="check-memory")
382 os.remove(files[name])
386 elif self.mode == "upload-POST":
389 d = self.POST(url, t="upload", file=("%d.data" % size, data))
390 elif self.mode in ("receive",
391 "download", "download-GET", "download-GET-slow"):
392 # mode=receive: upload the data from a local peer, so that the
393 # client-under-test receives and stores the shares
395 # mode=download*: upload the data from a local peer, then have
396 # the client-under-test download it.
398 # we need to wait until the uploading node has connected to all
399 # peers, since the wait_for_client_connections() above doesn't
400 # pay attention to our self.nodes[] and their connections.
401 files[name] = self.create_data(name, size)
402 u = self.nodes[0].getServiceNamed("uploader")
403 d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
404 d.addCallback(lambda res:
405 u.upload(upload.FileName(files[name],
406 convergence="check-memory")))
407 d.addCallback(lambda results: results.get_uri())
409 raise ValueError("unknown mode=%s" % self.mode)
412 print "uploaded %s" % name
413 d.addCallback(_complete)
416 def _do_download(self, res, size, uris):
417 if self.mode not in ("download", "download-GET", "download-GET-slow"):
420 print "downloading %s" % name
423 if self.mode == "download":
424 d = self.control_rref.callRemote("download_from_uri_to_file",
426 elif self.mode == "download-GET":
427 url = "/uri/%s" % uri
428 d = self.GET_discard(urllib.quote(url), stall=False)
429 elif self.mode == "download-GET-slow":
430 url = "/uri/%s" % uri
431 d = self.GET_discard(urllib.quote(url), stall=True)
434 print "downloaded %s" % name
436 d.addCallback(_complete)
440 #print "CLIENT STARTED"
441 #print "FURL", self.control_furl
442 #print "RREF", self.control_rref
444 kB = 1000; MB = 1000*1000
448 d = self._print_usage()
449 d.addCallback(self.stash_stats, "0B")
452 d.addCallback(self._do_upload, 10*kB+i, files, uris)
453 d.addCallback(self._do_download, 10*kB+i, uris)
454 d.addCallback(self._print_usage)
455 d.addCallback(self.stash_stats, "10kB")
458 d.addCallback(self._do_upload, 10*MB+i, files, uris)
459 d.addCallback(self._do_download, 10*MB+i, uris)
460 d.addCallback(self._print_usage)
461 d.addCallback(self.stash_stats, "10MB")
464 d.addCallback(self._do_upload, 50*MB+i, files, uris)
465 d.addCallback(self._do_download, 50*MB+i, uris)
466 d.addCallback(self._print_usage)
467 d.addCallback(self.stash_stats, "50MB")
470 # d.addCallback(self._do_upload, 100*MB+i, files, uris)
471 # d.addCallback(self._do_download, 100*MB+i, uris)
472 # d.addCallback(self._print_usage)
473 #d.addCallback(self.stash_stats, "100MB")
475 #d.addCallback(self.stall)
481 def stall(self, res):
483 reactor.callLater(5, d.callback, None)
487 class ClientWatcher(protocol.ProcessProtocol):
489 def outReceived(self, data):
491 def errReceived(self, data):
493 def processEnded(self, reason):
495 self.d.callback(None)
498 if __name__ == '__main__':
500 if len(sys.argv) > 1:
502 # put the logfile and stats.out in _test_memory/ . These stick around.
503 # put the nodes and other files in _test_memory/test/ . These are
504 # removed each time we run.
505 sf = SystemFramework("_test_memory", mode)