]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/check_memory.py
3ef32f947aec9545895de62c8ed591bb629f2715
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / check_memory.py
1 #! /usr/bin/env python
2
3 import os, shutil, sys, urllib, time, stat
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web import client as tw_client
8 from allmydata import client, introducer
9 from allmydata.immutable import upload
10 from allmydata.scripts import create_node
11 from allmydata.util import fileutil, pollmixin
12 from allmydata.util.fileutil import abspath_expanduser_unicode
13 from allmydata.util.encodingutil import get_filesystem_encoding
14 from foolscap.api import Tub, fireEventually, flushEventualQueue
15 from twisted.python import log
16
17 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
18     full_speed_ahead = False
19     _bytes_so_far = 0
20     stalled = None
21     def handleResponsePart(self, data):
22         self._bytes_so_far += len(data)
23         if not self.factory.do_stall:
24             return
25         if self.full_speed_ahead:
26             return
27         if self._bytes_so_far > 1e6+100:
28             if not self.stalled:
29                 print "STALLING"
30                 self.transport.pauseProducing()
31                 self.stalled = reactor.callLater(10.0, self._resume_speed)
32     def _resume_speed(self):
33         print "RESUME SPEED"
34         self.stalled = None
35         self.full_speed_ahead = True
36         self.transport.resumeProducing()
37     def handleResponseEnd(self):
38         if self.stalled:
39             print "CANCEL"
40             self.stalled.cancel()
41             self.stalled = None
42         return tw_client.HTTPPageGetter.handleResponseEnd(self)
43
44 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
45     protocol = StallableHTTPGetterDiscarder
46
47 def discardPage(url, stall=False, *args, **kwargs):
48     """Start fetching the URL, but stall our pipe after the first 1MB.
49     Wait 10 seconds, then resume downloading (and discarding) everything.
50     """
51     # adapted from twisted.web.client.getPage . We can't just wrap or
52     # subclass because it provides no way to override the HTTPClientFactory
53     # that it creates.
54     scheme, host, port, path = tw_client._parse(url)
55     factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
56     factory.do_stall = stall
57     assert scheme == 'http'
58     reactor.connectTCP(host, port, factory)
59     return factory.deferred
60
61 class ChildDidNotStartError(Exception):
62     pass
63
64 class SystemFramework(pollmixin.PollMixin):
65     numnodes = 7
66
67     def __init__(self, basedir, mode):
68         self.basedir = basedir = abspath_expanduser_unicode(unicode(basedir))
69         if not (basedir + os.path.sep).startswith(abspath_expanduser_unicode(u".") + os.path.sep):
70             raise AssertionError("safety issue: basedir must be a subdir")
71         self.testdir = testdir = os.path.join(basedir, "test")
72         if os.path.exists(testdir):
73             shutil.rmtree(testdir)
74         fileutil.make_dirs(testdir)
75         self.sparent = service.MultiService()
76         self.sparent.startService()
77         self.proc = None
78         self.tub = Tub()
79         self.tub.setOption("expose-remote-exception-types", False)
80         self.tub.setServiceParent(self.sparent)
81         self.mode = mode
82         self.failed = False
83         self.keepalive_file = None
84
85     def run(self):
86         framelog = os.path.join(self.basedir, "driver.log")
87         log.startLogging(open(framelog, "a"), setStdout=False)
88         log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
89         #logfile = open(os.path.join(self.testdir, "log"), "w")
90         #flo = log.FileLogObserver(logfile)
91         #log.startLoggingWithObserver(flo.emit, setStdout=False)
92         d = fireEventually()
93         d.addCallback(lambda res: self.setUp())
94         d.addCallback(lambda res: self.record_initial_memusage())
95         d.addCallback(lambda res: self.make_nodes())
96         d.addCallback(lambda res: self.wait_for_client_connected())
97         d.addCallback(lambda res: self.do_test())
98         d.addBoth(self.tearDown)
99         def _err(err):
100             self.failed = err
101             log.err(err)
102             print err
103         d.addErrback(_err)
104         def _done(res):
105             reactor.stop()
106             return res
107         d.addBoth(_done)
108         reactor.run()
109         if self.failed:
110             # raiseException doesn't work for CopiedFailures
111             self.failed.raiseException()
112
113     def setUp(self):
114         #print "STARTING"
115         self.stats = {}
116         self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
117         d = self.make_introducer()
118         def _more(res):
119             return self.start_client()
120         d.addCallback(_more)
121         def _record_control_furl(control_furl):
122             self.control_furl = control_furl
123             #print "OBTAINING '%s'" % (control_furl,)
124             return self.tub.getReference(self.control_furl)
125         d.addCallback(_record_control_furl)
126         def _record_control(control_rref):
127             self.control_rref = control_rref
128         d.addCallback(_record_control)
129         def _ready(res):
130             #print "CLIENT READY"
131             pass
132         d.addCallback(_ready)
133         return d
134
135     def record_initial_memusage(self):
136         print
137         print "Client started (no connections yet)"
138         d = self._print_usage()
139         d.addCallback(self.stash_stats, "init")
140         return d
141
142     def wait_for_client_connected(self):
143         print
144         print "Client connecting to other nodes.."
145         return self.control_rref.callRemote("wait_for_client_connections",
146                                             self.numnodes+1)
147
148     def tearDown(self, passthrough):
149         # the client node will shut down in a few seconds
150         #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
151         log.msg("shutting down SystemTest services")
152         if self.keepalive_file and os.path.exists(self.keepalive_file):
153             age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
154             log.msg("keepalive file at shutdown was %ds old" % age)
155         d = defer.succeed(None)
156         if self.proc:
157             d.addCallback(lambda res: self.kill_client())
158         d.addCallback(lambda res: self.sparent.stopService())
159         d.addCallback(lambda res: flushEventualQueue())
160         def _close_statsfile(res):
161             self.statsfile.close()
162         d.addCallback(_close_statsfile)
163         d.addCallback(lambda res: passthrough)
164         return d
165
166     def add_service(self, s):
167         s.setServiceParent(self.sparent)
168         return s
169
170     def make_introducer(self):
171         iv_basedir = os.path.join(self.testdir, "introducer")
172         os.mkdir(iv_basedir)
173         iv = introducer.IntroducerNode(basedir=iv_basedir)
174         self.introducer = self.add_service(iv)
175         d = self.introducer.when_tub_ready()
176         def _introducer_ready(res):
177             q = self.introducer
178             self.introducer_furl = q.introducer_url
179         d.addCallback(_introducer_ready)
180         return d
181
182     def make_nodes(self):
183         self.nodes = []
184         for i in range(self.numnodes):
185             nodedir = os.path.join(self.testdir, "node%d" % i)
186             os.mkdir(nodedir)
187             f = open(os.path.join(nodedir, "introducer.furl"), "w")
188             f.write(self.introducer_furl)
189             f.close()
190             # the only tests for which we want the internal nodes to actually
191             # retain shares are the ones where somebody's going to download
192             # them.
193             if self.mode in ("download", "download-GET", "download-GET-slow"):
194                 # retain shares
195                 pass
196             else:
197                 # for these tests, we tell the storage servers to pretend to
198                 # accept shares, but really just throw them out, since we're
199                 # only testing upload and not download.
200                 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
201                 f.write("no_storage\n")
202                 f.close()
203             if self.mode in ("receive",):
204                 # for this mode, the client-under-test gets all the shares,
205                 # so our internal nodes can refuse requests
206                 f = open(os.path.join(nodedir, "readonly_storage"), "w")
207                 f.write("\n")
208                 f.close()
209             c = self.add_service(client.Client(basedir=nodedir))
210             self.nodes.append(c)
211         # the peers will start running, eventually they will connect to each
212         # other and the introducer
213
214     def touch_keepalive(self):
215         if os.path.exists(self.keepalive_file):
216             age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
217             log.msg("touching keepalive file, was %ds old" % age)
218         f = open(self.keepalive_file, "w")
219         f.write("""\
220 If the node notices this file at startup, it will poll every 5 seconds and
221 terminate if the file is more than 10 seconds old, or if it has been deleted.
222 If the test harness has an internal failure and neglects to kill off the node
223 itself, this helps to avoid leaving processes lying around. The contents of
224 this file are ignored.
225         """)
226         f.close()
227
228     def start_client(self):
229         # this returns a Deferred that fires with the client's control.furl
230         log.msg("MAKING CLIENT")
231         # self.testdir is an absolute Unicode path
232         clientdir = self.clientdir = os.path.join(self.testdir, u"client")
233         clientdir_str = clientdir.encode(get_filesystem_encoding())
234         quiet = StringIO()
235         create_node.create_node(clientdir, {}, out=quiet)
236         log.msg("DONE MAKING CLIENT")
237         f = open(os.path.join(clientdir, "introducer.furl"), "w")
238         f.write(self.introducer_furl + "\n")
239         f.close()
240
241         # set webport=0 and then ask the node what port it picked.
242         f = open(os.path.join(clientdir, "webport"), "w")
243         f.write("tcp:0:interface=127.0.0.1\n")
244         f.close()
245
246         if self.mode in ("upload-self", "receive"):
247             # accept and store shares, to trigger the memory consumption bugs
248             pass
249         else:
250             # don't accept any shares
251             f = open(os.path.join(clientdir, "readonly_storage"), "w")
252             f.write("true\n")
253             f.close()
254             ## also, if we do receive any shares, throw them away
255             #f = open(os.path.join(clientdir, "debug_no_storage"), "w")
256             #f.write("no_storage\n")
257             #f.close()
258         if self.mode == "upload-self":
259             pass
260         self.keepalive_file = os.path.join(clientdir,
261                                            "suicide_prevention_hotline")
262         # now start updating the mtime.
263         self.touch_keepalive()
264         ts = internet.TimerService(1.0, self.touch_keepalive)
265         ts.setServiceParent(self.sparent)
266
267         pp = ClientWatcher()
268         self.proc_done = pp.d = defer.Deferred()
269         logfile = os.path.join(self.basedir, "client.log")
270         cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
271         env = os.environ.copy()
272         self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir_str)
273         log.msg("CLIENT STARTED")
274
275         # now we wait for the client to get started. we're looking for the
276         # control.furl file to appear.
277         furl_file = os.path.join(clientdir, "private", "control.furl")
278         url_file = os.path.join(clientdir, "node.url")
279         def _check():
280             if pp.ended and pp.ended.value.status != 0:
281                 # the twistd process ends normally (with rc=0) if the child
282                 # is successfully launched. It ends abnormally (with rc!=0)
283                 # if the child cannot be launched.
284                 raise ChildDidNotStartError("process ended while waiting for startup")
285             return os.path.exists(furl_file)
286         d = self.poll(_check, 0.1)
287         # once it exists, wait a moment before we read from it, just in case
288         # it hasn't finished writing the whole thing. Ideally control.furl
289         # would be created in some atomic fashion, or made non-readable until
290         # it's ready, but I can't think of an easy way to do that, and I
291         # think the chances that we'll observe a half-write are pretty low.
292         def _stall(res):
293             d2 = defer.Deferred()
294             reactor.callLater(0.1, d2.callback, None)
295             return d2
296         d.addCallback(_stall)
297         def _read(res):
298             # read the node's URL
299             self.webish_url = open(url_file, "r").read().strip()
300             if self.webish_url[-1] == "/":
301                 # trim trailing slash, since the rest of the code wants it gone
302                 self.webish_url = self.webish_url[:-1]
303             f = open(furl_file, "r")
304             furl = f.read()
305             return furl.strip()
306         d.addCallback(_read)
307         return d
308
309
310     def kill_client(self):
311         # returns a Deferred that fires when the process exits. This may only
312         # be called once.
313         try:
314             self.proc.signalProcess("INT")
315         except error.ProcessExitedAlready:
316             pass
317         return self.proc_done
318
319
320     def create_data(self, name, size):
321         filename = os.path.join(self.testdir, name + ".data")
322         f = open(filename, "wb")
323         block = "a" * 8192
324         while size > 0:
325             l = min(size, 8192)
326             f.write(block[:l])
327             size -= l
328         return filename
329
330     def stash_stats(self, stats, name):
331         self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
332         self.statsfile.flush()
333         self.stats[name] = stats['VmPeak']
334
335     def POST(self, urlpath, **fields):
336         url = self.webish_url + urlpath
337         sepbase = "boogabooga"
338         sep = "--" + sepbase
339         form = []
340         form.append(sep)
341         form.append('Content-Disposition: form-data; name="_charset"')
342         form.append('')
343         form.append('UTF-8')
344         form.append(sep)
345         for name, value in fields.iteritems():
346             if isinstance(value, tuple):
347                 filename, value = value
348                 form.append('Content-Disposition: form-data; name="%s"; '
349                             'filename="%s"' % (name, filename))
350             else:
351                 form.append('Content-Disposition: form-data; name="%s"' % name)
352             form.append('')
353             form.append(value)
354             form.append(sep)
355         form[-1] += "--"
356         body = "\r\n".join(form) + "\r\n"
357         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
358                    }
359         return tw_client.getPage(url, method="POST", postdata=body,
360                                  headers=headers, followRedirect=False)
361
362     def GET_discard(self, urlpath, stall):
363         url = self.webish_url + urlpath + "?filename=dummy-get.out"
364         return discardPage(url, stall)
365
366     def _print_usage(self, res=None):
367         d = self.control_rref.callRemote("get_memory_usage")
368         def _print(stats):
369             print "VmSize: %9d  VmPeak: %9d" % (stats["VmSize"],
370                                                 stats["VmPeak"])
371             return stats
372         d.addCallback(_print)
373         return d
374
375     def _do_upload(self, res, size, files, uris):
376         name = '%d' % size
377         print
378         print "uploading %s" % name
379         if self.mode in ("upload", "upload-self"):
380             files[name] = self.create_data(name, size)
381             d = self.control_rref.callRemote("upload_from_file_to_uri",
382                                              files[name], convergence="check-memory convergence string")
383             def _done(uri):
384                 os.remove(files[name])
385                 del files[name]
386                 return uri
387             d.addCallback(_done)
388         elif self.mode == "upload-POST":
389             data = "a" * size
390             url = "/uri"
391             d = self.POST(url, t="upload", file=("%d.data" % size, data))
392         elif self.mode in ("receive",
393                            "download", "download-GET", "download-GET-slow"):
394             # mode=receive: upload the data from a local peer, so that the
395             # client-under-test receives and stores the shares
396             #
397             # mode=download*: upload the data from a local peer, then have
398             # the client-under-test download it.
399             #
400             # we need to wait until the uploading node has connected to all
401             # peers, since the wait_for_client_connections() above doesn't
402             # pay attention to our self.nodes[] and their connections.
403             files[name] = self.create_data(name, size)
404             u = self.nodes[0].getServiceNamed("uploader")
405             d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
406             d.addCallback(lambda res: u.upload(upload.FileName(files[name], convergence="check-memory convergence string")))
407             d.addCallback(lambda results: results.uri)
408         else:
409             raise ValueError("unknown mode=%s" % self.mode)
410         def _complete(uri):
411             uris[name] = uri
412             print "uploaded %s" % name
413         d.addCallback(_complete)
414         return d
415
416     def _do_download(self, res, size, uris):
417         if self.mode not in ("download", "download-GET", "download-GET-slow"):
418             return
419         name = '%d' % size
420         print "downloading %s" % name
421         uri = uris[name]
422
423         if self.mode == "download":
424             d = self.control_rref.callRemote("download_from_uri_to_file",
425                                              uri, "dummy.out")
426         elif self.mode == "download-GET":
427             url = "/uri/%s" % uri
428             d = self.GET_discard(urllib.quote(url), stall=False)
429         elif self.mode == "download-GET-slow":
430             url = "/uri/%s" % uri
431             d = self.GET_discard(urllib.quote(url), stall=True)
432
433         def _complete(res):
434             print "downloaded %s" % name
435             return res
436         d.addCallback(_complete)
437         return d
438
439     def do_test(self):
440         #print "CLIENT STARTED"
441         #print "FURL", self.control_furl
442         #print "RREF", self.control_rref
443         #print
444         kB = 1000; MB = 1000*1000
445         files = {}
446         uris = {}
447
448         d = self._print_usage()
449         d.addCallback(self.stash_stats, "0B")
450
451         for i in range(10):
452             d.addCallback(self._do_upload, 10*kB+i, files, uris)
453             d.addCallback(self._do_download, 10*kB+i, uris)
454             d.addCallback(self._print_usage)
455         d.addCallback(self.stash_stats, "10kB")
456
457         for i in range(3):
458             d.addCallback(self._do_upload, 10*MB+i, files, uris)
459             d.addCallback(self._do_download, 10*MB+i, uris)
460             d.addCallback(self._print_usage)
461         d.addCallback(self.stash_stats, "10MB")
462
463         for i in range(1):
464             d.addCallback(self._do_upload, 50*MB+i, files, uris)
465             d.addCallback(self._do_download, 50*MB+i, uris)
466             d.addCallback(self._print_usage)
467         d.addCallback(self.stash_stats, "50MB")
468
469         #for i in range(1):
470         #    d.addCallback(self._do_upload, 100*MB+i, files, uris)
471         #    d.addCallback(self._do_download, 100*MB+i, uris)
472         #    d.addCallback(self._print_usage)
473         #d.addCallback(self.stash_stats, "100MB")
474
475         #d.addCallback(self.stall)
476         def _done(res):
477             print "FINISHING"
478         d.addCallback(_done)
479         return d
480
481     def stall(self, res):
482         d = defer.Deferred()
483         reactor.callLater(5, d.callback, None)
484         return d
485
486
487 class ClientWatcher(protocol.ProcessProtocol):
488     ended = False
489     def outReceived(self, data):
490         print "OUT:", data
491     def errReceived(self, data):
492         print "ERR:", data
493     def processEnded(self, reason):
494         self.ended = reason
495         self.d.callback(None)
496
497
498 if __name__ == '__main__':
499     mode = "upload"
500     if len(sys.argv) > 1:
501         mode = sys.argv[1]
502     # put the logfile and stats.out in _test_memory/ . These stick around.
503     # put the nodes and other files in _test_memory/test/ . These are
504     # removed each time we run.
505     sf = SystemFramework("_test_memory", mode)
506     sf.run()
507