3 import os, shutil, sys, urllib, time, stat
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web import client as tw_client
8 from allmydata import client, introducer
9 from allmydata.immutable import upload
10 from allmydata.scripts import create_node
11 from allmydata.util import fileutil, pollmixin
12 from allmydata.util.fileutil import abspath_expanduser_unicode
13 from allmydata.util.encodingutil import get_filesystem_encoding
14 from foolscap.api import Tub, fireEventually, flushEventualQueue
15 from twisted.python import log
17 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
18 full_speed_ahead = False
21 def handleResponsePart(self, data):
22 self._bytes_so_far += len(data)
23 if not self.factory.do_stall:
25 if self.full_speed_ahead:
27 if self._bytes_so_far > 1e6+100:
30 self.transport.pauseProducing()
31 self.stalled = reactor.callLater(10.0, self._resume_speed)
32 def _resume_speed(self):
35 self.full_speed_ahead = True
36 self.transport.resumeProducing()
37 def handleResponseEnd(self):
42 return tw_client.HTTPPageGetter.handleResponseEnd(self)
44 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
45 protocol = StallableHTTPGetterDiscarder
47 def discardPage(url, stall=False, *args, **kwargs):
48 """Start fetching the URL, but stall our pipe after the first 1MB.
49 Wait 10 seconds, then resume downloading (and discarding) everything.
51 # adapted from twisted.web.client.getPage . We can't just wrap or
52 # subclass because it provides no way to override the HTTPClientFactory
54 scheme, host, port, path = tw_client._parse(url)
55 factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
56 factory.do_stall = stall
57 assert scheme == 'http'
58 reactor.connectTCP(host, port, factory)
59 return factory.deferred
61 class ChildDidNotStartError(Exception):
64 class SystemFramework(pollmixin.PollMixin):
67 def __init__(self, basedir, mode):
68 self.basedir = basedir = abspath_expanduser_unicode(unicode(basedir))
69 if not (basedir + os.path.sep).startswith(abspath_expanduser_unicode(u".") + os.path.sep):
70 raise AssertionError("safety issue: basedir must be a subdir")
71 self.testdir = testdir = os.path.join(basedir, "test")
72 if os.path.exists(testdir):
73 shutil.rmtree(testdir)
74 fileutil.make_dirs(testdir)
75 self.sparent = service.MultiService()
76 self.sparent.startService()
79 self.tub.setOption("expose-remote-exception-types", False)
80 self.tub.setServiceParent(self.sparent)
83 self.keepalive_file = None
86 framelog = os.path.join(self.basedir, "driver.log")
87 log.startLogging(open(framelog, "a"), setStdout=False)
88 log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
89 #logfile = open(os.path.join(self.testdir, "log"), "w")
90 #flo = log.FileLogObserver(logfile)
91 #log.startLoggingWithObserver(flo.emit, setStdout=False)
93 d.addCallback(lambda res: self.setUp())
94 d.addCallback(lambda res: self.record_initial_memusage())
95 d.addCallback(lambda res: self.make_nodes())
96 d.addCallback(lambda res: self.wait_for_client_connected())
97 d.addCallback(lambda res: self.do_test())
98 d.addBoth(self.tearDown)
110 # raiseException doesn't work for CopiedFailures
111 self.failed.raiseException()
116 self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
117 d = self.make_introducer()
119 return self.start_client()
121 def _record_control_furl(control_furl):
122 self.control_furl = control_furl
123 #print "OBTAINING '%s'" % (control_furl,)
124 return self.tub.getReference(self.control_furl)
125 d.addCallback(_record_control_furl)
126 def _record_control(control_rref):
127 self.control_rref = control_rref
128 d.addCallback(_record_control)
130 #print "CLIENT READY"
132 d.addCallback(_ready)
135 def record_initial_memusage(self):
137 print "Client started (no connections yet)"
138 d = self._print_usage()
139 d.addCallback(self.stash_stats, "init")
142 def wait_for_client_connected(self):
144 print "Client connecting to other nodes.."
145 return self.control_rref.callRemote("wait_for_client_connections",
148 def tearDown(self, passthrough):
149 # the client node will shut down in a few seconds
150 #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
151 log.msg("shutting down SystemTest services")
152 if self.keepalive_file and os.path.exists(self.keepalive_file):
153 age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
154 log.msg("keepalive file at shutdown was %ds old" % age)
155 d = defer.succeed(None)
157 d.addCallback(lambda res: self.kill_client())
158 d.addCallback(lambda res: self.sparent.stopService())
159 d.addCallback(lambda res: flushEventualQueue())
160 def _close_statsfile(res):
161 self.statsfile.close()
162 d.addCallback(_close_statsfile)
163 d.addCallback(lambda res: passthrough)
166 def add_service(self, s):
167 s.setServiceParent(self.sparent)
170 def make_introducer(self):
171 iv_basedir = os.path.join(self.testdir, "introducer")
173 iv = introducer.IntroducerNode(basedir=iv_basedir)
174 self.introducer = self.add_service(iv)
175 d = self.introducer.when_tub_ready()
176 def _introducer_ready(res):
178 self.introducer_furl = q.introducer_url
179 d.addCallback(_introducer_ready)
182 def make_nodes(self):
184 for i in range(self.numnodes):
185 nodedir = os.path.join(self.testdir, "node%d" % i)
187 f = open(os.path.join(nodedir, "introducer.furl"), "w")
188 f.write(self.introducer_furl)
190 # the only tests for which we want the internal nodes to actually
191 # retain shares are the ones where somebody's going to download
193 if self.mode in ("download", "download-GET", "download-GET-slow"):
197 # for these tests, we tell the storage servers to pretend to
198 # accept shares, but really just throw them out, since we're
199 # only testing upload and not download.
200 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
201 f.write("no_storage\n")
203 if self.mode in ("receive",):
204 # for this mode, the client-under-test gets all the shares,
205 # so our internal nodes can refuse requests
206 f = open(os.path.join(nodedir, "readonly_storage"), "w")
209 c = self.add_service(client.Client(basedir=nodedir))
211 # the peers will start running, eventually they will connect to each
212 # other and the introducer
214 def touch_keepalive(self):
215 if os.path.exists(self.keepalive_file):
216 age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
217 log.msg("touching keepalive file, was %ds old" % age)
218 f = open(self.keepalive_file, "w")
220 If the node notices this file at startup, it will poll every 5 seconds and
221 terminate if the file is more than 10 seconds old, or if it has been deleted.
222 If the test harness has an internal failure and neglects to kill off the node
223 itself, this helps to avoid leaving processes lying around. The contents of
224 this file are ignored.
228 def start_client(self):
229 # this returns a Deferred that fires with the client's control.furl
230 log.msg("MAKING CLIENT")
231 # self.testdir is an absolute Unicode path
232 clientdir = self.clientdir = os.path.join(self.testdir, u"client")
233 clientdir_str = clientdir.encode(get_filesystem_encoding())
235 create_node.create_node(clientdir, {}, out=quiet)
236 log.msg("DONE MAKING CLIENT")
237 f = open(os.path.join(clientdir, "introducer.furl"), "w")
238 f.write(self.introducer_furl + "\n")
241 # set webport=0 and then ask the node what port it picked.
242 f = open(os.path.join(clientdir, "webport"), "w")
243 f.write("tcp:0:interface=127.0.0.1\n")
246 if self.mode in ("upload-self", "receive"):
247 # accept and store shares, to trigger the memory consumption bugs
250 # don't accept any shares
251 f = open(os.path.join(clientdir, "readonly_storage"), "w")
254 ## also, if we do receive any shares, throw them away
255 #f = open(os.path.join(clientdir, "debug_no_storage"), "w")
256 #f.write("no_storage\n")
258 if self.mode == "upload-self":
260 self.keepalive_file = os.path.join(clientdir,
261 "suicide_prevention_hotline")
262 # now start updating the mtime.
263 self.touch_keepalive()
264 ts = internet.TimerService(1.0, self.touch_keepalive)
265 ts.setServiceParent(self.sparent)
268 self.proc_done = pp.d = defer.Deferred()
269 logfile = os.path.join(self.basedir, "client.log")
270 cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
271 env = os.environ.copy()
272 self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir_str)
273 log.msg("CLIENT STARTED")
275 # now we wait for the client to get started. we're looking for the
276 # control.furl file to appear.
277 furl_file = os.path.join(clientdir, "private", "control.furl")
278 url_file = os.path.join(clientdir, "node.url")
280 if pp.ended and pp.ended.value.status != 0:
281 # the twistd process ends normally (with rc=0) if the child
282 # is successfully launched. It ends abnormally (with rc!=0)
283 # if the child cannot be launched.
284 raise ChildDidNotStartError("process ended while waiting for startup")
285 return os.path.exists(furl_file)
286 d = self.poll(_check, 0.1)
287 # once it exists, wait a moment before we read from it, just in case
288 # it hasn't finished writing the whole thing. Ideally control.furl
289 # would be created in some atomic fashion, or made non-readable until
290 # it's ready, but I can't think of an easy way to do that, and I
291 # think the chances that we'll observe a half-write are pretty low.
293 d2 = defer.Deferred()
294 reactor.callLater(0.1, d2.callback, None)
296 d.addCallback(_stall)
298 # read the node's URL
299 self.webish_url = open(url_file, "r").read().strip()
300 if self.webish_url[-1] == "/":
301 # trim trailing slash, since the rest of the code wants it gone
302 self.webish_url = self.webish_url[:-1]
303 f = open(furl_file, "r")
310 def kill_client(self):
311 # returns a Deferred that fires when the process exits. This may only
314 self.proc.signalProcess("INT")
315 except error.ProcessExitedAlready:
317 return self.proc_done
320 def create_data(self, name, size):
321 filename = os.path.join(self.testdir, name + ".data")
322 f = open(filename, "wb")
330 def stash_stats(self, stats, name):
331 self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
332 self.statsfile.flush()
333 self.stats[name] = stats['VmPeak']
335 def POST(self, urlpath, **fields):
336 url = self.webish_url + urlpath
337 sepbase = "boogabooga"
341 form.append('Content-Disposition: form-data; name="_charset"')
345 for name, value in fields.iteritems():
346 if isinstance(value, tuple):
347 filename, value = value
348 form.append('Content-Disposition: form-data; name="%s"; '
349 'filename="%s"' % (name, filename))
351 form.append('Content-Disposition: form-data; name="%s"' % name)
356 body = "\r\n".join(form) + "\r\n"
357 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
359 return tw_client.getPage(url, method="POST", postdata=body,
360 headers=headers, followRedirect=False)
362 def GET_discard(self, urlpath, stall):
363 url = self.webish_url + urlpath + "?filename=dummy-get.out"
364 return discardPage(url, stall)
366 def _print_usage(self, res=None):
367 d = self.control_rref.callRemote("get_memory_usage")
369 print "VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
372 d.addCallback(_print)
375 def _do_upload(self, res, size, files, uris):
378 print "uploading %s" % name
379 if self.mode in ("upload", "upload-self"):
380 files[name] = self.create_data(name, size)
381 d = self.control_rref.callRemote("upload_from_file_to_uri",
382 files[name], convergence="check-memory convergence string")
384 os.remove(files[name])
388 elif self.mode == "upload-POST":
391 d = self.POST(url, t="upload", file=("%d.data" % size, data))
392 elif self.mode in ("receive",
393 "download", "download-GET", "download-GET-slow"):
394 # mode=receive: upload the data from a local peer, so that the
395 # client-under-test receives and stores the shares
397 # mode=download*: upload the data from a local peer, then have
398 # the client-under-test download it.
400 # we need to wait until the uploading node has connected to all
401 # peers, since the wait_for_client_connections() above doesn't
402 # pay attention to our self.nodes[] and their connections.
403 files[name] = self.create_data(name, size)
404 u = self.nodes[0].getServiceNamed("uploader")
405 d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
406 d.addCallback(lambda res: u.upload(upload.FileName(files[name], convergence="check-memory convergence string")))
407 d.addCallback(lambda results: results.uri)
409 raise ValueError("unknown mode=%s" % self.mode)
412 print "uploaded %s" % name
413 d.addCallback(_complete)
416 def _do_download(self, res, size, uris):
417 if self.mode not in ("download", "download-GET", "download-GET-slow"):
420 print "downloading %s" % name
423 if self.mode == "download":
424 d = self.control_rref.callRemote("download_from_uri_to_file",
426 elif self.mode == "download-GET":
427 url = "/uri/%s" % uri
428 d = self.GET_discard(urllib.quote(url), stall=False)
429 elif self.mode == "download-GET-slow":
430 url = "/uri/%s" % uri
431 d = self.GET_discard(urllib.quote(url), stall=True)
434 print "downloaded %s" % name
436 d.addCallback(_complete)
440 #print "CLIENT STARTED"
441 #print "FURL", self.control_furl
442 #print "RREF", self.control_rref
444 kB = 1000; MB = 1000*1000
448 d = self._print_usage()
449 d.addCallback(self.stash_stats, "0B")
452 d.addCallback(self._do_upload, 10*kB+i, files, uris)
453 d.addCallback(self._do_download, 10*kB+i, uris)
454 d.addCallback(self._print_usage)
455 d.addCallback(self.stash_stats, "10kB")
458 d.addCallback(self._do_upload, 10*MB+i, files, uris)
459 d.addCallback(self._do_download, 10*MB+i, uris)
460 d.addCallback(self._print_usage)
461 d.addCallback(self.stash_stats, "10MB")
464 d.addCallback(self._do_upload, 50*MB+i, files, uris)
465 d.addCallback(self._do_download, 50*MB+i, uris)
466 d.addCallback(self._print_usage)
467 d.addCallback(self.stash_stats, "50MB")
470 # d.addCallback(self._do_upload, 100*MB+i, files, uris)
471 # d.addCallback(self._do_download, 100*MB+i, uris)
472 # d.addCallback(self._print_usage)
473 #d.addCallback(self.stash_stats, "100MB")
475 #d.addCallback(self.stall)
481 def stall(self, res):
483 reactor.callLater(5, d.callback, None)
487 class ClientWatcher(protocol.ProcessProtocol):
489 def outReceived(self, data):
491 def errReceived(self, data):
493 def processEnded(self, reason):
495 self.d.callback(None)
498 if __name__ == '__main__':
500 if len(sys.argv) > 1:
502 # put the logfile and stats.out in _test_memory/ . These stick around.
503 # put the nodes and other files in _test_memory/test/ . These are
504 # removed each time we run.
505 sf = SystemFramework("_test_memory", mode)