1 import os, shutil, sys, urllib, time, stat, urlparse
2 from cStringIO import StringIO
3 from twisted.internet import defer, reactor, protocol, error
4 from twisted.application import service, internet
5 from twisted.web import client as tw_client
6 from allmydata import client, introducer
7 from allmydata.immutable import upload
8 from allmydata.scripts import create_node
9 from allmydata.util import fileutil, pollmixin
10 from allmydata.util.fileutil import abspath_expanduser_unicode
11 from allmydata.util.encodingutil import get_filesystem_encoding
12 from foolscap.api import Tub, fireEventually, flushEventualQueue
13 from twisted.python import log
15 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
16 full_speed_ahead = False
19 def handleResponsePart(self, data):
20 self._bytes_so_far += len(data)
21 if not self.factory.do_stall:
23 if self.full_speed_ahead:
25 if self._bytes_so_far > 1e6+100:
28 self.transport.pauseProducing()
29 self.stalled = reactor.callLater(10.0, self._resume_speed)
30 def _resume_speed(self):
33 self.full_speed_ahead = True
34 self.transport.resumeProducing()
35 def handleResponseEnd(self):
40 return tw_client.HTTPPageGetter.handleResponseEnd(self)
42 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
43 protocol = StallableHTTPGetterDiscarder
45 def discardPage(url, stall=False, *args, **kwargs):
46 """Start fetching the URL, but stall our pipe after the first 1MB.
47 Wait 10 seconds, then resume downloading (and discarding) everything.
49 # adapted from twisted.web.client.getPage . We can't just wrap or
50 # subclass because it provides no way to override the HTTPClientFactory
52 scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
53 assert scheme == 'http'
54 host, port = netloc, 80
56 host, port = host.split(":")
58 factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
59 factory.do_stall = stall
60 reactor.connectTCP(host, port, factory)
61 return factory.deferred
63 class ChildDidNotStartError(Exception):
66 class SystemFramework(pollmixin.PollMixin):
69 def __init__(self, basedir, mode):
70 self.basedir = basedir = abspath_expanduser_unicode(unicode(basedir))
71 if not (basedir + os.path.sep).startswith(abspath_expanduser_unicode(u".") + os.path.sep):
72 raise AssertionError("safety issue: basedir must be a subdir")
73 self.testdir = testdir = os.path.join(basedir, "test")
74 if os.path.exists(testdir):
75 shutil.rmtree(testdir)
76 fileutil.make_dirs(testdir)
77 self.sparent = service.MultiService()
78 self.sparent.startService()
81 self.tub.setOption("expose-remote-exception-types", False)
82 self.tub.setServiceParent(self.sparent)
85 self.keepalive_file = None
88 framelog = os.path.join(self.basedir, "driver.log")
89 log.startLogging(open(framelog, "a"), setStdout=False)
90 log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
91 #logfile = open(os.path.join(self.testdir, "log"), "w")
92 #flo = log.FileLogObserver(logfile)
93 #log.startLoggingWithObserver(flo.emit, setStdout=False)
95 d.addCallback(lambda res: self.setUp())
96 d.addCallback(lambda res: self.record_initial_memusage())
97 d.addCallback(lambda res: self.make_nodes())
98 d.addCallback(lambda res: self.wait_for_client_connected())
99 d.addCallback(lambda res: self.do_test())
100 d.addBoth(self.tearDown)
112 # raiseException doesn't work for CopiedFailures
113 self.failed.raiseException()
118 self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
119 d = self.make_introducer()
121 return self.start_client()
123 def _record_control_furl(control_furl):
124 self.control_furl = control_furl
125 #print "OBTAINING '%s'" % (control_furl,)
126 return self.tub.getReference(self.control_furl)
127 d.addCallback(_record_control_furl)
128 def _record_control(control_rref):
129 self.control_rref = control_rref
130 d.addCallback(_record_control)
132 #print "CLIENT READY"
134 d.addCallback(_ready)
137 def record_initial_memusage(self):
139 print "Client started (no connections yet)"
140 d = self._print_usage()
141 d.addCallback(self.stash_stats, "init")
144 def wait_for_client_connected(self):
146 print "Client connecting to other nodes.."
147 return self.control_rref.callRemote("wait_for_client_connections",
150 def tearDown(self, passthrough):
151 # the client node will shut down in a few seconds
152 #os.remove(os.path.join(self.clientdir, client.Client.EXIT_TRIGGER_FILE))
153 log.msg("shutting down SystemTest services")
154 if self.keepalive_file and os.path.exists(self.keepalive_file):
155 age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
156 log.msg("keepalive file at shutdown was %ds old" % age)
157 d = defer.succeed(None)
159 d.addCallback(lambda res: self.kill_client())
160 d.addCallback(lambda res: self.sparent.stopService())
161 d.addCallback(lambda res: flushEventualQueue())
162 def _close_statsfile(res):
163 self.statsfile.close()
164 d.addCallback(_close_statsfile)
165 d.addCallback(lambda res: passthrough)
168 def add_service(self, s):
169 s.setServiceParent(self.sparent)
172 def make_introducer(self):
173 iv_basedir = os.path.join(self.testdir, "introducer")
175 iv = introducer.IntroducerNode(basedir=iv_basedir)
176 self.introducer = self.add_service(iv)
177 d = self.introducer.when_tub_ready()
178 def _introducer_ready(res):
180 self.introducer_furl = q.introducer_url
181 d.addCallback(_introducer_ready)
184 def make_nodes(self):
186 for i in range(self.numnodes):
187 nodedir = os.path.join(self.testdir, "node%d" % i)
189 f = open(os.path.join(nodedir, "tahoe.cfg"), "w")
191 "introducer.furl = %s\n"
194 % (self.introducer_furl,))
195 # the only tests for which we want the internal nodes to actually
196 # retain shares are the ones where somebody's going to download
198 if self.mode in ("download", "download-GET", "download-GET-slow"):
202 # for these tests, we tell the storage servers to pretend to
203 # accept shares, but really just throw them out, since we're
204 # only testing upload and not download.
205 f.write("debug_discard = true\n")
206 if self.mode in ("receive",):
207 # for this mode, the client-under-test gets all the shares,
208 # so our internal nodes can refuse requests
209 f.write("readonly = true\n")
211 c = self.add_service(client.Client(basedir=nodedir))
213 # the peers will start running, eventually they will connect to each
214 # other and the introducer
216 def touch_keepalive(self):
217 if os.path.exists(self.keepalive_file):
218 age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
219 log.msg("touching keepalive file, was %ds old" % age)
220 f = open(self.keepalive_file, "w")
222 If the node notices this file at startup, it will poll every 5 seconds and
223 terminate if the file is more than 10 seconds old, or if it has been deleted.
224 If the test harness has an internal failure and neglects to kill off the node
225 itself, this helps to avoid leaving processes lying around. The contents of
226 this file are ignored.
230 def start_client(self):
231 # this returns a Deferred that fires with the client's control.furl
232 log.msg("MAKING CLIENT")
233 # self.testdir is an absolute Unicode path
234 clientdir = self.clientdir = os.path.join(self.testdir, u"client")
235 clientdir_str = clientdir.encode(get_filesystem_encoding())
237 create_node.create_node({'basedir': clientdir}, out=quiet)
238 log.msg("DONE MAKING CLIENT")
239 # now replace tahoe.cfg
240 # set webport=0 and then ask the node what port it picked.
241 f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
243 "web.port = tcp:0:interface=127.0.0.1\n"
245 "introducer.furl = %s\n"
248 % (self.introducer_furl,))
250 if self.mode in ("upload-self", "receive"):
251 # accept and store shares, to trigger the memory consumption bugs
254 # don't accept any shares
255 f.write("readonly = true\n")
256 ## also, if we do receive any shares, throw them away
257 #f.write("debug_discard = true")
258 if self.mode == "upload-self":
261 self.keepalive_file = os.path.join(clientdir,
262 client.Client.EXIT_TRIGGER_FILE)
263 # now start updating the mtime.
264 self.touch_keepalive()
265 ts = internet.TimerService(1.0, self.touch_keepalive)
266 ts.setServiceParent(self.sparent)
269 self.proc_done = pp.d = defer.Deferred()
270 logfile = os.path.join(self.basedir, "client.log")
271 cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
272 env = os.environ.copy()
273 self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir_str)
274 log.msg("CLIENT STARTED")
276 # now we wait for the client to get started. we're looking for the
277 # control.furl file to appear.
278 furl_file = os.path.join(clientdir, "private", "control.furl")
279 url_file = os.path.join(clientdir, "node.url")
281 if pp.ended and pp.ended.value.status != 0:
282 # the twistd process ends normally (with rc=0) if the child
283 # is successfully launched. It ends abnormally (with rc!=0)
284 # if the child cannot be launched.
285 raise ChildDidNotStartError("process ended while waiting for startup")
286 return os.path.exists(furl_file)
287 d = self.poll(_check, 0.1)
288 # once it exists, wait a moment before we read from it, just in case
289 # it hasn't finished writing the whole thing. Ideally control.furl
290 # would be created in some atomic fashion, or made non-readable until
291 # it's ready, but I can't think of an easy way to do that, and I
292 # think the chances that we'll observe a half-write are pretty low.
294 d2 = defer.Deferred()
295 reactor.callLater(0.1, d2.callback, None)
297 d.addCallback(_stall)
299 # read the node's URL
300 self.webish_url = open(url_file, "r").read().strip()
301 if self.webish_url[-1] == "/":
302 # trim trailing slash, since the rest of the code wants it gone
303 self.webish_url = self.webish_url[:-1]
304 f = open(furl_file, "r")
311 def kill_client(self):
312 # returns a Deferred that fires when the process exits. This may only
315 self.proc.signalProcess("INT")
316 except error.ProcessExitedAlready:
318 return self.proc_done
321 def create_data(self, name, size):
322 filename = os.path.join(self.testdir, name + ".data")
323 f = open(filename, "wb")
331 def stash_stats(self, stats, name):
332 self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
333 self.statsfile.flush()
334 self.stats[name] = stats['VmPeak']
336 def POST(self, urlpath, **fields):
337 url = self.webish_url + urlpath
338 sepbase = "boogabooga"
342 form.append('Content-Disposition: form-data; name="_charset"')
346 for name, value in fields.iteritems():
347 if isinstance(value, tuple):
348 filename, value = value
349 form.append('Content-Disposition: form-data; name="%s"; '
350 'filename="%s"' % (name, filename))
352 form.append('Content-Disposition: form-data; name="%s"' % name)
357 body = "\r\n".join(form) + "\r\n"
358 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
360 return tw_client.getPage(url, method="POST", postdata=body,
361 headers=headers, followRedirect=False)
363 def GET_discard(self, urlpath, stall):
364 url = self.webish_url + urlpath + "?filename=dummy-get.out"
365 return discardPage(url, stall)
367 def _print_usage(self, res=None):
368 d = self.control_rref.callRemote("get_memory_usage")
370 print "VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
373 d.addCallback(_print)
376 def _do_upload(self, res, size, files, uris):
379 print "uploading %s" % name
380 if self.mode in ("upload", "upload-self"):
381 files[name] = self.create_data(name, size)
382 d = self.control_rref.callRemote("upload_from_file_to_uri",
383 files[name].encode("utf-8"),
384 convergence="check-memory")
386 os.remove(files[name])
390 elif self.mode == "upload-POST":
393 d = self.POST(url, t="upload", file=("%d.data" % size, data))
394 elif self.mode in ("receive",
395 "download", "download-GET", "download-GET-slow"):
396 # mode=receive: upload the data from a local peer, so that the
397 # client-under-test receives and stores the shares
399 # mode=download*: upload the data from a local peer, then have
400 # the client-under-test download it.
402 # we need to wait until the uploading node has connected to all
403 # peers, since the wait_for_client_connections() above doesn't
404 # pay attention to our self.nodes[] and their connections.
405 files[name] = self.create_data(name, size)
406 u = self.nodes[0].getServiceNamed("uploader")
407 d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
408 d.addCallback(lambda res:
409 u.upload(upload.FileName(files[name],
410 convergence="check-memory")))
411 d.addCallback(lambda results: results.get_uri())
413 raise ValueError("unknown mode=%s" % self.mode)
416 print "uploaded %s" % name
417 d.addCallback(_complete)
420 def _do_download(self, res, size, uris):
421 if self.mode not in ("download", "download-GET", "download-GET-slow"):
424 print "downloading %s" % name
427 if self.mode == "download":
428 d = self.control_rref.callRemote("download_from_uri_to_file",
430 elif self.mode == "download-GET":
431 url = "/uri/%s" % uri
432 d = self.GET_discard(urllib.quote(url), stall=False)
433 elif self.mode == "download-GET-slow":
434 url = "/uri/%s" % uri
435 d = self.GET_discard(urllib.quote(url), stall=True)
438 print "downloaded %s" % name
440 d.addCallback(_complete)
444 #print "CLIENT STARTED"
445 #print "FURL", self.control_furl
446 #print "RREF", self.control_rref
448 kB = 1000; MB = 1000*1000
452 d = self._print_usage()
453 d.addCallback(self.stash_stats, "0B")
456 d.addCallback(self._do_upload, 10*kB+i, files, uris)
457 d.addCallback(self._do_download, 10*kB+i, uris)
458 d.addCallback(self._print_usage)
459 d.addCallback(self.stash_stats, "10kB")
462 d.addCallback(self._do_upload, 10*MB+i, files, uris)
463 d.addCallback(self._do_download, 10*MB+i, uris)
464 d.addCallback(self._print_usage)
465 d.addCallback(self.stash_stats, "10MB")
468 d.addCallback(self._do_upload, 50*MB+i, files, uris)
469 d.addCallback(self._do_download, 50*MB+i, uris)
470 d.addCallback(self._print_usage)
471 d.addCallback(self.stash_stats, "50MB")
474 # d.addCallback(self._do_upload, 100*MB+i, files, uris)
475 # d.addCallback(self._do_download, 100*MB+i, uris)
476 # d.addCallback(self._print_usage)
477 #d.addCallback(self.stash_stats, "100MB")
479 #d.addCallback(self.stall)
485 def stall(self, res):
487 reactor.callLater(5, d.callback, None)
491 class ClientWatcher(protocol.ProcessProtocol):
493 def outReceived(self, data):
495 def errReceived(self, data):
497 def processEnded(self, reason):
499 self.d.callback(None)
502 if __name__ == '__main__':
504 if len(sys.argv) > 1:
506 # put the logfile and stats.out in _test_memory/ . These stick around.
507 # put the nodes and other files in _test_memory/test/ . These are
508 # removed each time we run.
509 sf = SystemFramework("_test_memory", mode)