3 import os, shutil, sys, urllib, time, stat
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web import client as tw_client
8 from allmydata import client, introducer
9 from allmydata.immutable import upload
10 from allmydata.scripts import create_node
11 from allmydata.util import fileutil, pollmixin
13 from foolscap import eventual
14 from twisted.python import log
16 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
17 full_speed_ahead = False
20 def handleResponsePart(self, data):
21 self._bytes_so_far += len(data)
22 if not self.factory.do_stall:
24 if self.full_speed_ahead:
26 if self._bytes_so_far > 1e6+100:
29 self.transport.pauseProducing()
30 self.stalled = reactor.callLater(10.0, self._resume_speed)
31 def _resume_speed(self):
34 self.full_speed_ahead = True
35 self.transport.resumeProducing()
36 def handleResponseEnd(self):
41 return tw_client.HTTPPageGetter.handleResponseEnd(self)
43 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
44 protocol = StallableHTTPGetterDiscarder
46 def discardPage(url, stall=False, *args, **kwargs):
47 """Start fetching the URL, but stall our pipe after the first 1MB.
48 Wait 10 seconds, then resume downloading (and discarding) everything.
50 # adapted from twisted.web.client.getPage . We can't just wrap or
51 # subclass because it provides no way to override the HTTPClientFactory
53 scheme, host, port, path = tw_client._parse(url)
54 factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
55 factory.do_stall = stall
56 assert scheme == 'http'
57 reactor.connectTCP(host, port, factory)
58 return factory.deferred
60 class SystemFramework(pollmixin.PollMixin):
63 def __init__(self, basedir, mode):
64 self.basedir = basedir = os.path.abspath(basedir)
65 if not basedir.startswith(os.path.abspath(".")):
66 raise AssertionError("safety issue: basedir must be a subdir")
67 self.testdir = testdir = os.path.join(basedir, "test")
68 if os.path.exists(testdir):
69 shutil.rmtree(testdir)
70 fileutil.make_dirs(testdir)
71 self.sparent = service.MultiService()
72 self.sparent.startService()
74 self.tub = foolscap.Tub()
75 self.tub.setServiceParent(self.sparent)
78 self.keepalive_file = None
81 framelog = os.path.join(self.basedir, "driver.log")
82 log.startLogging(open(framelog, "a"), setStdout=False)
83 log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
84 #logfile = open(os.path.join(self.testdir, "log"), "w")
85 #flo = log.FileLogObserver(logfile)
86 #log.startLoggingWithObserver(flo.emit, setStdout=False)
87 d = eventual.fireEventually()
88 d.addCallback(lambda res: self.setUp())
89 d.addCallback(lambda res: self.record_initial_memusage())
90 d.addCallback(lambda res: self.make_nodes())
91 d.addCallback(lambda res: self.wait_for_client_connected())
92 d.addCallback(lambda res: self.do_test())
93 d.addBoth(self.tearDown)
105 # raiseException doesn't work for CopiedFailures
106 self.failed.raiseException()
111 self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
112 d = self.make_introducer()
114 return self.start_client()
116 def _record_control_furl(control_furl):
117 self.control_furl = control_furl
118 #print "OBTAINING '%s'" % (control_furl,)
119 return self.tub.getReference(self.control_furl)
120 d.addCallback(_record_control_furl)
121 def _record_control(control_rref):
122 self.control_rref = control_rref
123 d.addCallback(_record_control)
125 #print "CLIENT READY"
127 d.addCallback(_ready)
130 def record_initial_memusage(self):
132 print "Client started (no connections yet)"
133 d = self._print_usage()
134 d.addCallback(self.stash_stats, "init")
137 def wait_for_client_connected(self):
139 print "Client connecting to other nodes.."
140 return self.control_rref.callRemote("wait_for_client_connections",
143 def tearDown(self, passthrough):
144 # the client node will shut down in a few seconds
145 #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
146 log.msg("shutting down SystemTest services")
147 if self.keepalive_file and os.path.exists(self.keepalive_file):
148 age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
149 log.msg("keepalive file at shutdown was %ds old" % age)
150 d = defer.succeed(None)
152 d.addCallback(lambda res: self.kill_client())
153 d.addCallback(lambda res: self.sparent.stopService())
154 d.addCallback(lambda res: eventual.flushEventualQueue())
155 def _close_statsfile(res):
156 self.statsfile.close()
157 d.addCallback(_close_statsfile)
158 d.addCallback(lambda res: passthrough)
161 def add_service(self, s):
162 s.setServiceParent(self.sparent)
165 def make_introducer(self):
166 iv_basedir = os.path.join(self.testdir, "introducer")
168 iv = introducer.IntroducerNode(basedir=iv_basedir)
169 self.introducer = self.add_service(iv)
170 d = self.introducer.when_tub_ready()
171 def _introducer_ready(res):
173 self.introducer_furl = q.introducer_url
174 d.addCallback(_introducer_ready)
177 def make_nodes(self):
179 for i in range(self.numnodes):
180 nodedir = os.path.join(self.testdir, "node%d" % i)
182 f = open(os.path.join(nodedir, "introducer.furl"), "w")
183 f.write(self.introducer_furl)
185 # the only tests for which we want the internal nodes to actually
186 # retain shares are the ones where somebody's going to download
188 if self.mode in ("download", "download-GET", "download-GET-slow"):
192 # for these tests, we tell the storage servers to pretend to
193 # accept shares, but really just throw them out, since we're
194 # only testing upload and not download.
195 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
196 f.write("no_storage\n")
198 if self.mode in ("receive",):
199 # for this mode, the client-under-test gets all the shares,
200 # so our internal nodes can refuse requests
201 f = open(os.path.join(nodedir, "sizelimit"), "w")
204 c = self.add_service(client.Client(basedir=nodedir))
206 # the peers will start running, eventually they will connect to each
207 # other and the introducer
209 def touch_keepalive(self):
210 if os.path.exists(self.keepalive_file):
211 age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
212 log.msg("touching keepalive file, was %ds old" % age)
213 f = open(self.keepalive_file, "w")
215 If the node notices this file at startup, it will poll every 5 seconds and
216 terminate if the file is more than 10 seconds old, or if it has been deleted.
217 If the test harness has an internal failure and neglects to kill off the node
218 itself, this helps to avoid leaving processes lying around. The contents of
219 this file are ignored.
223 def start_client(self):
224 # this returns a Deferred that fires with the client's control.furl
225 log.msg("MAKING CLIENT")
226 clientdir = self.clientdir = os.path.join(self.testdir, "client")
228 create_node.create_client(clientdir, {}, out=quiet)
229 log.msg("DONE MAKING CLIENT")
230 f = open(os.path.join(clientdir, "introducer.furl"), "w")
231 f.write(self.introducer_furl + "\n")
234 # set webport=0 and then ask the node what port it picked.
235 f = open(os.path.join(clientdir, "webport"), "w")
236 f.write("tcp:0:interface=127.0.0.1\n")
239 if self.mode in ("upload-self", "receive"):
240 # accept and store shares, to trigger the memory consumption bugs
243 # don't accept any shares
244 f = open(os.path.join(clientdir, "readonly_storage"), "w")
247 ## also, if we do receive any shares, throw them away
248 #f = open(os.path.join(clientdir, "debug_no_storage"), "w")
249 #f.write("no_storage\n")
251 if self.mode == "upload-self":
253 self.keepalive_file = os.path.join(clientdir,
254 "suicide_prevention_hotline")
255 # now start updating the mtime.
256 self.touch_keepalive()
257 ts = internet.TimerService(1.0, self.touch_keepalive)
258 ts.setServiceParent(self.sparent)
261 self.proc_done = pp.d = defer.Deferred()
262 logfile = os.path.join(self.basedir, "client.log")
263 cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
264 env = os.environ.copy()
265 self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
266 log.msg("CLIENT STARTED")
268 # now we wait for the client to get started. we're looking for the
269 # control.furl file to appear.
270 furl_file = os.path.join(clientdir, "private", "control.furl")
271 url_file = os.path.join(clientdir, "node.url")
273 if pp.ended and pp.ended.value.status != 0:
274 # the twistd process ends normally (with rc=0) if the child
275 # is successfully launched. It ends abnormally (with rc!=0)
276 # if the child cannot be launched.
277 raise RuntimeError("process ended while waiting for startup")
278 return os.path.exists(furl_file)
279 d = self.poll(_check, 0.1)
280 # once it exists, wait a moment before we read from it, just in case
281 # it hasn't finished writing the whole thing. Ideally control.furl
282 # would be created in some atomic fashion, or made non-readable until
283 # it's ready, but I can't think of an easy way to do that, and I
284 # think the chances that we'll observe a half-write are pretty low.
286 d2 = defer.Deferred()
287 reactor.callLater(0.1, d2.callback, None)
289 d.addCallback(_stall)
291 # read the node's URL
292 self.webish_url = open(url_file, "r").read().strip()
293 if self.webish_url[-1] == "/":
294 # trim trailing slash, since the rest of the code wants it gone
295 self.webish_url = self.webish_url[:-1]
296 f = open(furl_file, "r")
303 def kill_client(self):
304 # returns a Deferred that fires when the process exits. This may only
307 self.proc.signalProcess("INT")
308 except error.ProcessExitedAlready:
310 return self.proc_done
313 def create_data(self, name, size):
314 filename = os.path.join(self.testdir, name + ".data")
315 f = open(filename, "wb")
323 def stash_stats(self, stats, name):
324 self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
325 self.statsfile.flush()
326 self.stats[name] = stats['VmPeak']
328 def POST(self, urlpath, **fields):
329 url = self.webish_url + urlpath
330 sepbase = "boogabooga"
334 form.append('Content-Disposition: form-data; name="_charset"')
338 for name, value in fields.iteritems():
339 if isinstance(value, tuple):
340 filename, value = value
341 form.append('Content-Disposition: form-data; name="%s"; '
342 'filename="%s"' % (name, filename))
344 form.append('Content-Disposition: form-data; name="%s"' % name)
349 body = "\r\n".join(form) + "\r\n"
350 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
352 return tw_client.getPage(url, method="POST", postdata=body,
353 headers=headers, followRedirect=False)
355 def GET_discard(self, urlpath, stall):
356 url = self.webish_url + urlpath + "?filename=dummy-get.out"
357 return discardPage(url, stall)
359 def _print_usage(self, res=None):
360 d = self.control_rref.callRemote("get_memory_usage")
362 print "VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
365 d.addCallback(_print)
368 def _do_upload(self, res, size, files, uris):
371 print "uploading %s" % name
372 if self.mode in ("upload", "upload-self"):
373 files[name] = self.create_data(name, size)
374 d = self.control_rref.callRemote("upload_from_file_to_uri",
375 files[name], convergence="check-memory convergence string")
377 os.remove(files[name])
381 elif self.mode == "upload-POST":
384 d = self.POST(url, t="upload", file=("%d.data" % size, data))
385 elif self.mode in ("receive",
386 "download", "download-GET", "download-GET-slow"):
387 # mode=receive: upload the data from a local peer, so that the
388 # client-under-test receives and stores the shares
390 # mode=download*: upload the data from a local peer, then have
391 # the client-under-test download it.
393 # we need to wait until the uploading node has connected to all
394 # peers, since the wait_for_client_connections() above doesn't
395 # pay attention to our self.nodes[] and their connections.
396 files[name] = self.create_data(name, size)
397 u = self.nodes[0].getServiceNamed("uploader")
398 d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
399 d.addCallback(lambda res: u.upload(upload.FileName(files[name], convergence="check-memory convergence string")))
400 d.addCallback(lambda results: results.uri)
402 raise RuntimeError("unknown mode=%s" % self.mode)
405 print "uploaded %s" % name
406 d.addCallback(_complete)
409 def _do_download(self, res, size, uris):
410 if self.mode not in ("download", "download-GET", "download-GET-slow"):
413 print "downloading %s" % name
416 if self.mode == "download":
417 d = self.control_rref.callRemote("download_from_uri_to_file",
419 elif self.mode == "download-GET":
420 url = "/uri/%s" % uri
421 d = self.GET_discard(urllib.quote(url), stall=False)
422 elif self.mode == "download-GET-slow":
423 url = "/uri/%s" % uri
424 d = self.GET_discard(urllib.quote(url), stall=True)
427 print "downloaded %s" % name
429 d.addCallback(_complete)
433 #print "CLIENT STARTED"
434 #print "FURL", self.control_furl
435 #print "RREF", self.control_rref
437 kB = 1000; MB = 1000*1000
441 d = self._print_usage()
442 d.addCallback(self.stash_stats, "0B")
445 d.addCallback(self._do_upload, 10*kB+i, files, uris)
446 d.addCallback(self._do_download, 10*kB+i, uris)
447 d.addCallback(self._print_usage)
448 d.addCallback(self.stash_stats, "10kB")
451 d.addCallback(self._do_upload, 10*MB+i, files, uris)
452 d.addCallback(self._do_download, 10*MB+i, uris)
453 d.addCallback(self._print_usage)
454 d.addCallback(self.stash_stats, "10MB")
457 d.addCallback(self._do_upload, 50*MB+i, files, uris)
458 d.addCallback(self._do_download, 50*MB+i, uris)
459 d.addCallback(self._print_usage)
460 d.addCallback(self.stash_stats, "50MB")
463 # d.addCallback(self._do_upload, 100*MB+i, files, uris)
464 # d.addCallback(self._do_download, 100*MB+i, uris)
465 # d.addCallback(self._print_usage)
466 #d.addCallback(self.stash_stats, "100MB")
468 #d.addCallback(self.stall)
474 def stall(self, res):
476 reactor.callLater(5, d.callback, None)
480 class ClientWatcher(protocol.ProcessProtocol):
482 def outReceived(self, data):
484 def errReceived(self, data):
486 def processEnded(self, reason):
488 self.d.callback(None)
491 if __name__ == '__main__':
493 if len(sys.argv) > 1:
495 # put the logfile and stats.out in _test_memory/ . These stick around.
496 # put the nodes and other files in _test_memory/test/ . These are
497 # removed each time we run.
498 sf = SystemFramework("_test_memory", mode)