3 import os, shutil, sys, urllib, time, stat
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web import client as tw_client
8 from allmydata import client, introducer
9 from allmydata.immutable import upload
10 from allmydata.scripts import create_node
11 from allmydata.util import fileutil, pollmixin
12 from allmydata.util.fileutil import abspath_expanduser_unicode
13 from allmydata.util.encodingutil import get_filesystem_encoding
14 from foolscap.api import Tub, fireEventually, flushEventualQueue
15 from twisted.python import log
17 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
18 full_speed_ahead = False
21 def handleResponsePart(self, data):
22 self._bytes_so_far += len(data)
23 if not self.factory.do_stall:
25 if self.full_speed_ahead:
27 if self._bytes_so_far > 1e6+100:
30 self.transport.pauseProducing()
31 self.stalled = reactor.callLater(10.0, self._resume_speed)
32 def _resume_speed(self):
35 self.full_speed_ahead = True
36 self.transport.resumeProducing()
37 def handleResponseEnd(self):
42 return tw_client.HTTPPageGetter.handleResponseEnd(self)
44 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
45 protocol = StallableHTTPGetterDiscarder
47 def discardPage(url, stall=False, *args, **kwargs):
48 """Start fetching the URL, but stall our pipe after the first 1MB.
49 Wait 10 seconds, then resume downloading (and discarding) everything.
51 # adapted from twisted.web.client.getPage . We can't just wrap or
52 # subclass because it provides no way to override the HTTPClientFactory
54 scheme, host, port, path = tw_client._parse(url)
55 factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
56 factory.do_stall = stall
57 assert scheme == 'http'
58 reactor.connectTCP(host, port, factory)
59 return factory.deferred
61 class ChildDidNotStartError(Exception):
64 class SystemFramework(pollmixin.PollMixin):
67 def __init__(self, basedir, mode):
68 self.basedir = basedir = abspath_expanduser_unicode(unicode(basedir))
69 if not (basedir + os.path.sep).startswith(abspath_expanduser_unicode(u".") + os.path.sep):
70 raise AssertionError("safety issue: basedir must be a subdir")
71 self.testdir = testdir = os.path.join(basedir, "test")
72 if os.path.exists(testdir):
73 shutil.rmtree(testdir)
74 fileutil.make_dirs(testdir)
75 self.sparent = service.MultiService()
76 self.sparent.startService()
79 self.tub.setOption("expose-remote-exception-types", False)
80 self.tub.setServiceParent(self.sparent)
83 self.keepalive_file = None
86 framelog = os.path.join(self.basedir, "driver.log")
87 log.startLogging(open(framelog, "a"), setStdout=False)
88 log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
89 #logfile = open(os.path.join(self.testdir, "log"), "w")
90 #flo = log.FileLogObserver(logfile)
91 #log.startLoggingWithObserver(flo.emit, setStdout=False)
93 d.addCallback(lambda res: self.setUp())
94 d.addCallback(lambda res: self.record_initial_memusage())
95 d.addCallback(lambda res: self.make_nodes())
96 d.addCallback(lambda res: self.wait_for_client_connected())
97 d.addCallback(lambda res: self.do_test())
98 d.addBoth(self.tearDown)
110 # raiseException doesn't work for CopiedFailures
111 self.failed.raiseException()
116 self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
117 d = self.make_introducer()
119 return self.start_client()
121 def _record_control_furl(control_furl):
122 self.control_furl = control_furl
123 #print "OBTAINING '%s'" % (control_furl,)
124 return self.tub.getReference(self.control_furl)
125 d.addCallback(_record_control_furl)
126 def _record_control(control_rref):
127 self.control_rref = control_rref
128 d.addCallback(_record_control)
130 #print "CLIENT READY"
132 d.addCallback(_ready)
135 def record_initial_memusage(self):
137 print "Client started (no connections yet)"
138 d = self._print_usage()
139 d.addCallback(self.stash_stats, "init")
142 def wait_for_client_connected(self):
144 print "Client connecting to other nodes.."
145 return self.control_rref.callRemote("wait_for_client_connections",
148 def tearDown(self, passthrough):
149 # the client node will shut down in a few seconds
150 #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
151 log.msg("shutting down SystemTest services")
152 if self.keepalive_file and os.path.exists(self.keepalive_file):
153 age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
154 log.msg("keepalive file at shutdown was %ds old" % age)
155 d = defer.succeed(None)
157 d.addCallback(lambda res: self.kill_client())
158 d.addCallback(lambda res: self.sparent.stopService())
159 d.addCallback(lambda res: flushEventualQueue())
160 def _close_statsfile(res):
161 self.statsfile.close()
162 d.addCallback(_close_statsfile)
163 d.addCallback(lambda res: passthrough)
166 def add_service(self, s):
167 s.setServiceParent(self.sparent)
170 def make_introducer(self):
171 iv_basedir = os.path.join(self.testdir, "introducer")
173 iv = introducer.IntroducerNode(basedir=iv_basedir)
174 self.introducer = self.add_service(iv)
175 d = self.introducer.when_tub_ready()
176 def _introducer_ready(res):
178 self.introducer_furl = q.introducer_url
179 d.addCallback(_introducer_ready)
182 def make_nodes(self):
184 for i in range(self.numnodes):
185 nodedir = os.path.join(self.testdir, "node%d" % i)
187 f = open(os.path.join(nodedir, "tahoe.cfg"), "w")
189 "introducer.furl = %s\n"
192 % (self.introducer_furl,))
193 # the only tests for which we want the internal nodes to actually
194 # retain shares are the ones where somebody's going to download
196 if self.mode in ("download", "download-GET", "download-GET-slow"):
200 # for these tests, we tell the storage servers to pretend to
201 # accept shares, but really just throw them out, since we're
202 # only testing upload and not download.
203 f.write("debug_discard = true\n")
204 if self.mode in ("receive",):
205 # for this mode, the client-under-test gets all the shares,
206 # so our internal nodes can refuse requests
207 f.write("readonly = true\n")
209 c = self.add_service(client.Client(basedir=nodedir))
211 # the peers will start running, eventually they will connect to each
212 # other and the introducer
214 def touch_keepalive(self):
215 if os.path.exists(self.keepalive_file):
216 age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
217 log.msg("touching keepalive file, was %ds old" % age)
218 f = open(self.keepalive_file, "w")
220 If the node notices this file at startup, it will poll every 5 seconds and
221 terminate if the file is more than 10 seconds old, or if it has been deleted.
222 If the test harness has an internal failure and neglects to kill off the node
223 itself, this helps to avoid leaving processes lying around. The contents of
224 this file are ignored.
228 def start_client(self):
229 # this returns a Deferred that fires with the client's control.furl
230 log.msg("MAKING CLIENT")
231 # self.testdir is an absolute Unicode path
232 clientdir = self.clientdir = os.path.join(self.testdir, u"client")
233 clientdir_str = clientdir.encode(get_filesystem_encoding())
235 create_node.create_node({'basedir': clientdir}, out=quiet)
236 log.msg("DONE MAKING CLIENT")
237 # now replace tahoe.cfg
238 # set webport=0 and then ask the node what port it picked.
239 f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
241 "web.port = tcp:0:interface=127.0.0.1\n"
243 "introducer.furl = %s\n"
246 % (self.introducer_furl,))
248 if self.mode in ("upload-self", "receive"):
249 # accept and store shares, to trigger the memory consumption bugs
252 # don't accept any shares
253 f.write("readonly = true\n")
254 ## also, if we do receive any shares, throw them away
255 #f.write("debug_discard = true")
256 if self.mode == "upload-self":
259 self.keepalive_file = os.path.join(clientdir,
260 "suicide_prevention_hotline")
261 # now start updating the mtime.
262 self.touch_keepalive()
263 ts = internet.TimerService(1.0, self.touch_keepalive)
264 ts.setServiceParent(self.sparent)
267 self.proc_done = pp.d = defer.Deferred()
268 logfile = os.path.join(self.basedir, "client.log")
269 cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
270 env = os.environ.copy()
271 self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir_str)
272 log.msg("CLIENT STARTED")
274 # now we wait for the client to get started. we're looking for the
275 # control.furl file to appear.
276 furl_file = os.path.join(clientdir, "private", "control.furl")
277 url_file = os.path.join(clientdir, "node.url")
279 if pp.ended and pp.ended.value.status != 0:
280 # the twistd process ends normally (with rc=0) if the child
281 # is successfully launched. It ends abnormally (with rc!=0)
282 # if the child cannot be launched.
283 raise ChildDidNotStartError("process ended while waiting for startup")
284 return os.path.exists(furl_file)
285 d = self.poll(_check, 0.1)
286 # once it exists, wait a moment before we read from it, just in case
287 # it hasn't finished writing the whole thing. Ideally control.furl
288 # would be created in some atomic fashion, or made non-readable until
289 # it's ready, but I can't think of an easy way to do that, and I
290 # think the chances that we'll observe a half-write are pretty low.
292 d2 = defer.Deferred()
293 reactor.callLater(0.1, d2.callback, None)
295 d.addCallback(_stall)
297 # read the node's URL
298 self.webish_url = open(url_file, "r").read().strip()
299 if self.webish_url[-1] == "/":
300 # trim trailing slash, since the rest of the code wants it gone
301 self.webish_url = self.webish_url[:-1]
302 f = open(furl_file, "r")
309 def kill_client(self):
310 # returns a Deferred that fires when the process exits. This may only
313 self.proc.signalProcess("INT")
314 except error.ProcessExitedAlready:
316 return self.proc_done
319 def create_data(self, name, size):
320 filename = os.path.join(self.testdir, name + ".data")
321 f = open(filename, "wb")
329 def stash_stats(self, stats, name):
330 self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
331 self.statsfile.flush()
332 self.stats[name] = stats['VmPeak']
334 def POST(self, urlpath, **fields):
335 url = self.webish_url + urlpath
336 sepbase = "boogabooga"
340 form.append('Content-Disposition: form-data; name="_charset"')
344 for name, value in fields.iteritems():
345 if isinstance(value, tuple):
346 filename, value = value
347 form.append('Content-Disposition: form-data; name="%s"; '
348 'filename="%s"' % (name, filename))
350 form.append('Content-Disposition: form-data; name="%s"' % name)
355 body = "\r\n".join(form) + "\r\n"
356 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
358 return tw_client.getPage(url, method="POST", postdata=body,
359 headers=headers, followRedirect=False)
361 def GET_discard(self, urlpath, stall):
362 url = self.webish_url + urlpath + "?filename=dummy-get.out"
363 return discardPage(url, stall)
365 def _print_usage(self, res=None):
366 d = self.control_rref.callRemote("get_memory_usage")
368 print "VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
371 d.addCallback(_print)
374 def _do_upload(self, res, size, files, uris):
377 print "uploading %s" % name
378 if self.mode in ("upload", "upload-self"):
379 files[name] = self.create_data(name, size)
380 d = self.control_rref.callRemote("upload_from_file_to_uri",
381 files[name].encode("utf-8"),
382 convergence="check-memory")
384 os.remove(files[name])
388 elif self.mode == "upload-POST":
391 d = self.POST(url, t="upload", file=("%d.data" % size, data))
392 elif self.mode in ("receive",
393 "download", "download-GET", "download-GET-slow"):
394 # mode=receive: upload the data from a local peer, so that the
395 # client-under-test receives and stores the shares
397 # mode=download*: upload the data from a local peer, then have
398 # the client-under-test download it.
400 # we need to wait until the uploading node has connected to all
401 # peers, since the wait_for_client_connections() above doesn't
402 # pay attention to our self.nodes[] and their connections.
403 files[name] = self.create_data(name, size)
404 u = self.nodes[0].getServiceNamed("uploader")
405 d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
406 d.addCallback(lambda res:
407 u.upload(upload.FileName(files[name],
408 convergence="check-memory")))
409 d.addCallback(lambda results: results.uri)
411 raise ValueError("unknown mode=%s" % self.mode)
414 print "uploaded %s" % name
415 d.addCallback(_complete)
418 def _do_download(self, res, size, uris):
419 if self.mode not in ("download", "download-GET", "download-GET-slow"):
422 print "downloading %s" % name
425 if self.mode == "download":
426 d = self.control_rref.callRemote("download_from_uri_to_file",
428 elif self.mode == "download-GET":
429 url = "/uri/%s" % uri
430 d = self.GET_discard(urllib.quote(url), stall=False)
431 elif self.mode == "download-GET-slow":
432 url = "/uri/%s" % uri
433 d = self.GET_discard(urllib.quote(url), stall=True)
436 print "downloaded %s" % name
438 d.addCallback(_complete)
442 #print "CLIENT STARTED"
443 #print "FURL", self.control_furl
444 #print "RREF", self.control_rref
446 kB = 1000; MB = 1000*1000
450 d = self._print_usage()
451 d.addCallback(self.stash_stats, "0B")
454 d.addCallback(self._do_upload, 10*kB+i, files, uris)
455 d.addCallback(self._do_download, 10*kB+i, uris)
456 d.addCallback(self._print_usage)
457 d.addCallback(self.stash_stats, "10kB")
460 d.addCallback(self._do_upload, 10*MB+i, files, uris)
461 d.addCallback(self._do_download, 10*MB+i, uris)
462 d.addCallback(self._print_usage)
463 d.addCallback(self.stash_stats, "10MB")
466 d.addCallback(self._do_upload, 50*MB+i, files, uris)
467 d.addCallback(self._do_download, 50*MB+i, uris)
468 d.addCallback(self._print_usage)
469 d.addCallback(self.stash_stats, "50MB")
472 # d.addCallback(self._do_upload, 100*MB+i, files, uris)
473 # d.addCallback(self._do_download, 100*MB+i, uris)
474 # d.addCallback(self._print_usage)
475 #d.addCallback(self.stash_stats, "100MB")
477 #d.addCallback(self.stall)
483 def stall(self, res):
485 reactor.callLater(5, d.callback, None)
489 class ClientWatcher(protocol.ProcessProtocol):
491 def outReceived(self, data):
493 def errReceived(self, data):
495 def processEnded(self, reason):
497 self.d.callback(None)
500 if __name__ == '__main__':
502 if len(sys.argv) > 1:
504 # put the logfile and stats.out in _test_memory/ . These stick around.
505 # put the nodes and other files in _test_memory/test/ . These are
506 # removed each time we run.
507 sf = SystemFramework("_test_memory", mode)