3 import os, shutil, sys, urllib
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web import client as tw_client
8 from allmydata import client, introducer_and_vdrive
9 from allmydata.scripts import create_node
10 from allmydata.util import testutil, fileutil
12 from foolscap import eventual
13 from twisted.python import log
15 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
16 full_speed_ahead = False
19 def handleResponsePart(self, data):
20 self._bytes_so_far += len(data)
21 if not self.factory.do_stall:
23 if self.full_speed_ahead:
25 if self._bytes_so_far > 1e6+100:
28 self.transport.pauseProducing()
29 self.stalled = reactor.callLater(10.0, self._resume_speed)
30 def _resume_speed(self):
33 self.full_speed_ahead = True
34 self.transport.resumeProducing()
35 def handleResponseEnd(self):
40 return tw_client.HTTPPageGetter.handleResponseEnd(self)
42 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
43 protocol = StallableHTTPGetterDiscarder
45 def discardPage(url, stall=False, *args, **kwargs):
46 """Start fetching the URL, but stall our pipe after the first 1MB.
47 Wait 10 seconds, then resume downloading (and discarding) everything.
49 # adapted from twisted.web.client.getPage . We can't just wrap or
50 # subclass because it provides no way to override the HTTPClientFactory
52 scheme, host, port, path = tw_client._parse(url)
53 factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
54 factory.do_stall = stall
55 assert scheme == 'http'
56 reactor.connectTCP(host, port, factory)
57 return factory.deferred
59 class SystemFramework(testutil.PollMixin):
62 def __init__(self, basedir, mode):
63 self.basedir = basedir = os.path.abspath(basedir)
64 if not basedir.startswith(os.path.abspath(".")):
65 raise AssertionError("safety issue: basedir must be a subdir")
66 self.testdir = testdir = os.path.join(basedir, "test")
67 if os.path.exists(testdir):
68 shutil.rmtree(testdir)
69 fileutil.make_dirs(testdir)
70 self.sparent = service.MultiService()
71 self.sparent.startService()
73 self.tub = foolscap.Tub()
74 self.tub.setServiceParent(self.sparent)
79 log.startLogging(open(os.path.join(self.testdir, "log"), "w"),
81 #logfile = open(os.path.join(self.testdir, "log"), "w")
82 #flo = log.FileLogObserver(logfile)
83 #log.startLoggingWithObserver(flo.emit, setStdout=False)
84 d = eventual.fireEventually()
85 d.addCallback(lambda res: self.setUp())
86 d.addCallback(lambda res: self.do_test())
87 d.addBoth(self.tearDown)
99 self.failed.raiseException()
104 self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
105 d = self.make_introducer_and_vdrive()
108 return self.start_client()
110 def _record_control_furl(control_furl):
111 self.control_furl = control_furl
112 #print "OBTAINING '%s'" % (control_furl,)
113 return self.tub.getReference(self.control_furl)
114 d.addCallback(_record_control_furl)
115 def _record_control(control_rref):
116 self.control_rref = control_rref
117 return control_rref.callRemote("wait_for_client_connections",
119 d.addCallback(_record_control)
121 #print "CLIENT READY"
123 d.addCallback(_ready)
126 def tearDown(self, passthrough):
127 # the client node will shut down in a few seconds
128 #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
129 log.msg("shutting down SystemTest services")
130 d = defer.succeed(None)
132 d.addCallback(lambda res: self.kill_client())
133 d.addCallback(lambda res: self.sparent.stopService())
134 d.addCallback(lambda res: eventual.flushEventualQueue())
135 def _close_statsfile(res):
136 self.statsfile.close()
137 d.addCallback(_close_statsfile)
138 d.addCallback(lambda res: passthrough)
141 def add_service(self, s):
142 s.setServiceParent(self.sparent)
145 def make_introducer_and_vdrive(self):
146 iv_basedir = os.path.join(self.testdir, "introducer_and_vdrive")
148 iv = introducer_and_vdrive.IntroducerAndVdrive(basedir=iv_basedir)
149 self.introducer_and_vdrive = self.add_service(iv)
150 d = self.introducer_and_vdrive.when_tub_ready()
153 def make_nodes(self):
154 q = self.introducer_and_vdrive
155 self.introducer_furl = q.urls["introducer"]
156 self.vdrive_furl = q.urls["vdrive"]
158 for i in range(self.numnodes):
159 nodedir = os.path.join(self.testdir, "node%d" % i)
161 f = open(os.path.join(nodedir, "introducer.furl"), "w")
162 f.write(self.introducer_furl)
164 f = open(os.path.join(nodedir, "vdrive.furl"), "w")
165 f.write(self.vdrive_furl)
167 # the only tests for which we want the internal nodes to actually
168 # retain shares are the ones where somebody's going to download
170 if self.mode in ("download", "download-GET", "download-GET-slow"):
174 # for these tests, we tell the storage servers to pretend to
175 # accept shares, but really just throw them out, since we're
176 # only testing upload and not download.
177 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
178 f.write("no_storage\n")
180 if self.mode in ("receive",):
181 # for this mode, the client-under-test gets all the shares,
182 # so our internal nodes can refuse requests
183 f = open(os.path.join(nodedir, "sizelimit"), "w")
186 c = self.add_service(client.Client(basedir=nodedir))
188 # the peers will start running, eventually they will connect to each
189 # other and the introducer_and_vdrive
191 def touch_keepalive(self):
192 f = open(self.keepalive_file, "w")
194 If the node notices this file at startup, it will poll every 5 seconds and
195 terminate if the file is more than 10 seconds old, or if it has been deleted.
196 If the test harness has an internal failure and neglects to kill off the node
197 itself, this helps to avoid leaving processes lying around. The contents of
198 this file are ignored.
202 def start_client(self):
203 # this returns a Deferred that fires with the client's control.furl
204 log.msg("MAKING CLIENT")
205 clientdir = self.clientdir = os.path.join(self.testdir, "client")
207 create_node.create_client(clientdir, {}, out=quiet)
208 log.msg("DONE MAKING CLIENT")
209 f = open(os.path.join(clientdir, "introducer.furl"), "w")
210 f.write(self.introducer_furl + "\n")
212 f = open(os.path.join(clientdir, "vdrive.furl"), "w")
213 f.write(self.vdrive_furl + "\n")
215 f = open(os.path.join(clientdir, "webport"), "w")
216 # TODO: ideally we would set webport=0 and then ask the node what
217 # port it picked. But at the moment it is not convenient to do this,
218 # so we just pick a relatively unique one.
219 webport = max(os.getpid(), 2000)
220 f.write("tcp:%d:interface=127.0.0.1\n" % webport)
222 self.webish_url = "http://localhost:%d" % webport
223 if self.mode in ("upload-self", "receive"):
224 # accept and store shares, to trigger the memory consumption bugs
227 # don't accept any shares
228 f = open(os.path.join(clientdir, "sizelimit"), "w")
231 ## also, if we do receive any shares, throw them away
232 #f = open(os.path.join(clientdir, "debug_no_storage"), "w")
233 #f.write("no_storage\n")
235 if self.mode == "upload-self":
236 f = open(os.path.join(clientdir, "push_to_ourselves"), "w")
237 f.write("push_to_ourselves\n")
239 self.keepalive_file = os.path.join(clientdir,
240 "suicide_prevention_hotline")
241 # now start updating the mtime.
242 self.touch_keepalive()
243 ts = internet.TimerService(4.0, self.touch_keepalive)
244 ts.setServiceParent(self.sparent)
247 self.proc_done = pp.d = defer.Deferred()
248 logfile = os.path.join(self.basedir, "client.log")
249 cmd = ["twistd", "-n", "-y", "client.tac", "-l", logfile]
250 env = os.environ.copy()
251 self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
252 log.msg("CLIENT STARTED")
254 # now we wait for the client to get started. we're looking for the
255 # control.furl file to appear.
256 furl_file = os.path.join(clientdir, "control.furl")
258 if pp.ended and pp.ended.value.status != 0:
259 # the twistd process ends normally (with rc=0) if the child
260 # is successfully launched. It ends abnormally (with rc!=0)
261 # if the child cannot be launched.
262 raise RuntimeError("process ended while waiting for startup")
263 return os.path.exists(furl_file)
264 d = self.poll(_check, 0.1)
265 # once it exists, wait a moment before we read from it, just in case
266 # it hasn't finished writing the whole thing. Ideally control.furl
267 # would be created in some atomic fashion, or made non-readable until
268 # it's ready, but I can't think of an easy way to do that, and I
269 # think the chances that we'll observe a half-write are pretty low.
271 d2 = defer.Deferred()
272 reactor.callLater(0.1, d2.callback, None)
274 d.addCallback(_stall)
276 f = open(furl_file, "r")
283 def kill_client(self):
284 # returns a Deferred that fires when the process exits. This may only
287 self.proc.signalProcess("INT")
288 except error.ProcessExitedAlready:
290 return self.proc_done
293 def create_data(self, name, size):
294 filename = os.path.join(self.testdir, name + ".data")
295 f = open(filename, "wb")
303 def stash_stats(self, stats, name):
304 self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
305 self.statsfile.flush()
306 self.stats[name] = stats['VmPeak']
308 def POST(self, urlpath, **fields):
309 url = self.webish_url + urlpath
310 sepbase = "boogabooga"
314 form.append('Content-Disposition: form-data; name="_charset"')
318 for name, value in fields.iteritems():
319 if isinstance(value, tuple):
320 filename, value = value
321 form.append('Content-Disposition: form-data; name="%s"; '
322 'filename="%s"' % (name, filename))
324 form.append('Content-Disposition: form-data; name="%s"' % name)
329 body = "\r\n".join(form) + "\r\n"
330 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
332 return tw_client.getPage(url, method="POST", postdata=body,
333 headers=headers, followRedirect=False)
335 def GET_discard(self, urlpath, stall):
336 url = self.webish_url + urlpath + "?filename=dummy-get.out"
337 return discardPage(url, stall)
339 def _print_usage(self, res=None):
340 d = self.control_rref.callRemote("get_memory_usage")
342 print "VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
345 d.addCallback(_print)
348 def _do_upload(self, res, size, files, uris):
351 print "uploading %s" % name
352 if self.mode in ("upload", "upload-self"):
353 files[name] = self.create_data(name, size)
354 d = self.control_rref.callRemote("upload_from_file_to_uri",
357 os.remove(files[name])
361 elif self.mode == "upload-POST":
363 url = "/vdrive/global"
364 d = self.POST(url, t="upload", file=("%d.data" % size, data))
365 elif self.mode in ("receive",):
366 # upload the data from a local peer, so that the
367 # client-under-test receives and stores the shares
368 files[name] = self.create_data(name, size)
369 u = self.nodes[0].getServiceNamed("uploader")
370 d = u.upload_filename(files[name])
371 elif self.mode in ("download", "download-GET", "download-GET-slow"):
372 # upload the data from a local peer, then have the
373 # client-under-test download it.
374 files[name] = self.create_data(name, size)
375 u = self.nodes[0].getServiceNamed("uploader")
376 d = u.upload_filename(files[name])
378 raise RuntimeError("unknown mode=%s" % self.mode)
381 print "uploaded %s" % name
382 d.addCallback(_complete)
385 def _do_download(self, res, size, uris):
386 if self.mode not in ("download", "download-GET", "download-GET-slow"):
389 print "downloading %s" % name
392 if self.mode == "download":
393 d = self.control_rref.callRemote("download_from_uri_to_file",
395 elif self.mode == "download-GET":
396 url = "/uri/%s" % uri
397 d = self.GET_discard(urllib.quote(url), stall=False)
398 elif self.mode == "download-GET-slow":
399 url = "/uri/%s" % uri
400 d = self.GET_discard(urllib.quote(url), stall=True)
403 print "downloaded %s" % name
405 d.addCallback(_complete)
409 #print "CLIENT STARTED"
410 #print "FURL", self.control_furl
411 #print "RREF", self.control_rref
413 kB = 1000; MB = 1000*1000
417 d = self._print_usage()
418 d.addCallback(self.stash_stats, "0B")
421 d.addCallback(self._do_upload, 10*kB+i, files, uris)
422 d.addCallback(self._do_download, 10*kB+i, uris)
423 d.addCallback(self._print_usage)
424 d.addCallback(self.stash_stats, "10kB")
427 d.addCallback(self._do_upload, 10*MB+i, files, uris)
428 d.addCallback(self._do_download, 10*MB+i, uris)
429 d.addCallback(self._print_usage)
430 d.addCallback(self.stash_stats, "10MB")
433 d.addCallback(self._do_upload, 50*MB+i, files, uris)
434 d.addCallback(self._do_download, 50*MB+i, uris)
435 d.addCallback(self._print_usage)
436 d.addCallback(self.stash_stats, "50MB")
439 # d.addCallback(self._do_upload, 100*MB+i, files, uris)
440 # d.addCallback(self._do_download, 100*MB+i, uris)
441 # d.addCallback(self._print_usage)
442 #d.addCallback(self.stash_stats, "100MB")
444 #d.addCallback(self.stall)
450 def stall(self, res):
452 reactor.callLater(5, d.callback, None)
456 class ClientWatcher(protocol.ProcessProtocol):
458 def outReceived(self, data):
460 def errReceived(self, data):
462 def processEnded(self, reason):
464 self.d.callback(None)
467 if __name__ == '__main__':
469 if len(sys.argv) > 1:
471 # put the logfile and stats.out in _test_memory/ . These stick around.
472 # put the nodes and other files in _test_memory/test/ . These are
473 # removed each time we run.
474 sf = SystemFramework("_test_memory", mode)