]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/check_memory.py
tests: fix check_memory test
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / check_memory.py
1 #! /usr/bin/env python
2
3 import os, shutil, sys, urllib, time, stat
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web import client as tw_client
8 from allmydata import client, introducer
9 from allmydata.immutable import upload
10 from allmydata.scripts import create_node
11 from allmydata.util import fileutil, pollmixin
12 from allmydata.util.fileutil import abspath_expanduser_unicode
13 from allmydata.util.encodingutil import get_filesystem_encoding
14 from foolscap.api import Tub, fireEventually, flushEventualQueue
15 from twisted.python import log
16
17 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
18     full_speed_ahead = False
19     _bytes_so_far = 0
20     stalled = None
21     def handleResponsePart(self, data):
22         self._bytes_so_far += len(data)
23         if not self.factory.do_stall:
24             return
25         if self.full_speed_ahead:
26             return
27         if self._bytes_so_far > 1e6+100:
28             if not self.stalled:
29                 print "STALLING"
30                 self.transport.pauseProducing()
31                 self.stalled = reactor.callLater(10.0, self._resume_speed)
32     def _resume_speed(self):
33         print "RESUME SPEED"
34         self.stalled = None
35         self.full_speed_ahead = True
36         self.transport.resumeProducing()
37     def handleResponseEnd(self):
38         if self.stalled:
39             print "CANCEL"
40             self.stalled.cancel()
41             self.stalled = None
42         return tw_client.HTTPPageGetter.handleResponseEnd(self)
43
44 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
45     protocol = StallableHTTPGetterDiscarder
46
47 def discardPage(url, stall=False, *args, **kwargs):
48     """Start fetching the URL, but stall our pipe after the first 1MB.
49     Wait 10 seconds, then resume downloading (and discarding) everything.
50     """
51     # adapted from twisted.web.client.getPage . We can't just wrap or
52     # subclass because it provides no way to override the HTTPClientFactory
53     # that it creates.
54     scheme, host, port, path = tw_client._parse(url)
55     factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
56     factory.do_stall = stall
57     assert scheme == 'http'
58     reactor.connectTCP(host, port, factory)
59     return factory.deferred
60
61 class ChildDidNotStartError(Exception):
62     pass
63
64 class SystemFramework(pollmixin.PollMixin):
65     numnodes = 7
66
67     def __init__(self, basedir, mode):
68         self.basedir = basedir = abspath_expanduser_unicode(unicode(basedir))
69         if not (basedir + os.path.sep).startswith(abspath_expanduser_unicode(u".") + os.path.sep):
70             raise AssertionError("safety issue: basedir must be a subdir")
71         self.testdir = testdir = os.path.join(basedir, "test")
72         if os.path.exists(testdir):
73             shutil.rmtree(testdir)
74         fileutil.make_dirs(testdir)
75         self.sparent = service.MultiService()
76         self.sparent.startService()
77         self.proc = None
78         self.tub = Tub()
79         self.tub.setOption("expose-remote-exception-types", False)
80         self.tub.setServiceParent(self.sparent)
81         self.mode = mode
82         self.failed = False
83         self.keepalive_file = None
84
85     def run(self):
86         framelog = os.path.join(self.basedir, "driver.log")
87         log.startLogging(open(framelog, "a"), setStdout=False)
88         log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
89         #logfile = open(os.path.join(self.testdir, "log"), "w")
90         #flo = log.FileLogObserver(logfile)
91         #log.startLoggingWithObserver(flo.emit, setStdout=False)
92         d = fireEventually()
93         d.addCallback(lambda res: self.setUp())
94         d.addCallback(lambda res: self.record_initial_memusage())
95         d.addCallback(lambda res: self.make_nodes())
96         d.addCallback(lambda res: self.wait_for_client_connected())
97         d.addCallback(lambda res: self.do_test())
98         d.addBoth(self.tearDown)
99         def _err(err):
100             self.failed = err
101             log.err(err)
102             print err
103         d.addErrback(_err)
104         def _done(res):
105             reactor.stop()
106             return res
107         d.addBoth(_done)
108         reactor.run()
109         if self.failed:
110             # raiseException doesn't work for CopiedFailures
111             self.failed.raiseException()
112
113     def setUp(self):
114         #print "STARTING"
115         self.stats = {}
116         self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
117         d = self.make_introducer()
118         def _more(res):
119             return self.start_client()
120         d.addCallback(_more)
121         def _record_control_furl(control_furl):
122             self.control_furl = control_furl
123             #print "OBTAINING '%s'" % (control_furl,)
124             return self.tub.getReference(self.control_furl)
125         d.addCallback(_record_control_furl)
126         def _record_control(control_rref):
127             self.control_rref = control_rref
128         d.addCallback(_record_control)
129         def _ready(res):
130             #print "CLIENT READY"
131             pass
132         d.addCallback(_ready)
133         return d
134
135     def record_initial_memusage(self):
136         print
137         print "Client started (no connections yet)"
138         d = self._print_usage()
139         d.addCallback(self.stash_stats, "init")
140         return d
141
142     def wait_for_client_connected(self):
143         print
144         print "Client connecting to other nodes.."
145         return self.control_rref.callRemote("wait_for_client_connections",
146                                             self.numnodes+1)
147
148     def tearDown(self, passthrough):
149         # the client node will shut down in a few seconds
150         #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
151         log.msg("shutting down SystemTest services")
152         if self.keepalive_file and os.path.exists(self.keepalive_file):
153             age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
154             log.msg("keepalive file at shutdown was %ds old" % age)
155         d = defer.succeed(None)
156         if self.proc:
157             d.addCallback(lambda res: self.kill_client())
158         d.addCallback(lambda res: self.sparent.stopService())
159         d.addCallback(lambda res: flushEventualQueue())
160         def _close_statsfile(res):
161             self.statsfile.close()
162         d.addCallback(_close_statsfile)
163         d.addCallback(lambda res: passthrough)
164         return d
165
166     def add_service(self, s):
167         s.setServiceParent(self.sparent)
168         return s
169
170     def make_introducer(self):
171         iv_basedir = os.path.join(self.testdir, "introducer")
172         os.mkdir(iv_basedir)
173         iv = introducer.IntroducerNode(basedir=iv_basedir)
174         self.introducer = self.add_service(iv)
175         d = self.introducer.when_tub_ready()
176         def _introducer_ready(res):
177             q = self.introducer
178             self.introducer_furl = q.introducer_url
179         d.addCallback(_introducer_ready)
180         return d
181
182     def make_nodes(self):
183         self.nodes = []
184         for i in range(self.numnodes):
185             nodedir = os.path.join(self.testdir, "node%d" % i)
186             os.mkdir(nodedir)
187             f = open(os.path.join(nodedir, "tahoe.cfg"), "w")
188             f.write("[client]\n"
189                     "introducer.furl = %s\n"
190                     "shares.happy = 1\n"
191                     "[storage]\n"
192                     % (self.introducer_furl,))
193             # the only tests for which we want the internal nodes to actually
194             # retain shares are the ones where somebody's going to download
195             # them.
196             if self.mode in ("download", "download-GET", "download-GET-slow"):
197                 # retain shares
198                 pass
199             else:
200                 # for these tests, we tell the storage servers to pretend to
201                 # accept shares, but really just throw them out, since we're
202                 # only testing upload and not download.
203                 f.write("debug_discard = true\n")
204             if self.mode in ("receive",):
205                 # for this mode, the client-under-test gets all the shares,
206                 # so our internal nodes can refuse requests
207                 f.write("readonly = true\n")
208             f.close()
209             c = self.add_service(client.Client(basedir=nodedir))
210             self.nodes.append(c)
211         # the peers will start running, eventually they will connect to each
212         # other and the introducer
213
214     def touch_keepalive(self):
215         if os.path.exists(self.keepalive_file):
216             age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
217             log.msg("touching keepalive file, was %ds old" % age)
218         f = open(self.keepalive_file, "w")
219         f.write("""\
220 If the node notices this file at startup, it will poll every 5 seconds and
221 terminate if the file is more than 10 seconds old, or if it has been deleted.
222 If the test harness has an internal failure and neglects to kill off the node
223 itself, this helps to avoid leaving processes lying around. The contents of
224 this file are ignored.
225         """)
226         f.close()
227
228     def start_client(self):
229         # this returns a Deferred that fires with the client's control.furl
230         log.msg("MAKING CLIENT")
231         # self.testdir is an absolute Unicode path
232         clientdir = self.clientdir = os.path.join(self.testdir, u"client")
233         clientdir_str = clientdir.encode(get_filesystem_encoding())
234         quiet = StringIO()
235         create_node.create_node({'basedir': clientdir}, out=quiet)
236         log.msg("DONE MAKING CLIENT")
237         # now replace tahoe.cfg
238         # set webport=0 and then ask the node what port it picked.
239         f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
240         f.write("[node]\n"
241                 "web.port = tcp:0:interface=127.0.0.1\n"
242                 "[client]\n"
243                 "introducer.furl = %s\n"
244                 "shares.happy = 1\n"
245                 "[storage]\n"
246                 % (self.introducer_furl,))
247
248         if self.mode in ("upload-self", "receive"):
249             # accept and store shares, to trigger the memory consumption bugs
250             pass
251         else:
252             # don't accept any shares
253             f.write("readonly = true\n")
254             ## also, if we do receive any shares, throw them away
255             #f.write("debug_discard = true")
256         if self.mode == "upload-self":
257             pass
258         f.close()
259         self.keepalive_file = os.path.join(clientdir,
260                                            "suicide_prevention_hotline")
261         # now start updating the mtime.
262         self.touch_keepalive()
263         ts = internet.TimerService(1.0, self.touch_keepalive)
264         ts.setServiceParent(self.sparent)
265
266         pp = ClientWatcher()
267         self.proc_done = pp.d = defer.Deferred()
268         logfile = os.path.join(self.basedir, "client.log")
269         cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
270         env = os.environ.copy()
271         self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir_str)
272         log.msg("CLIENT STARTED")
273
274         # now we wait for the client to get started. we're looking for the
275         # control.furl file to appear.
276         furl_file = os.path.join(clientdir, "private", "control.furl")
277         url_file = os.path.join(clientdir, "node.url")
278         def _check():
279             if pp.ended and pp.ended.value.status != 0:
280                 # the twistd process ends normally (with rc=0) if the child
281                 # is successfully launched. It ends abnormally (with rc!=0)
282                 # if the child cannot be launched.
283                 raise ChildDidNotStartError("process ended while waiting for startup")
284             return os.path.exists(furl_file)
285         d = self.poll(_check, 0.1)
286         # once it exists, wait a moment before we read from it, just in case
287         # it hasn't finished writing the whole thing. Ideally control.furl
288         # would be created in some atomic fashion, or made non-readable until
289         # it's ready, but I can't think of an easy way to do that, and I
290         # think the chances that we'll observe a half-write are pretty low.
291         def _stall(res):
292             d2 = defer.Deferred()
293             reactor.callLater(0.1, d2.callback, None)
294             return d2
295         d.addCallback(_stall)
296         def _read(res):
297             # read the node's URL
298             self.webish_url = open(url_file, "r").read().strip()
299             if self.webish_url[-1] == "/":
300                 # trim trailing slash, since the rest of the code wants it gone
301                 self.webish_url = self.webish_url[:-1]
302             f = open(furl_file, "r")
303             furl = f.read()
304             return furl.strip()
305         d.addCallback(_read)
306         return d
307
308
309     def kill_client(self):
310         # returns a Deferred that fires when the process exits. This may only
311         # be called once.
312         try:
313             self.proc.signalProcess("INT")
314         except error.ProcessExitedAlready:
315             pass
316         return self.proc_done
317
318
319     def create_data(self, name, size):
320         filename = os.path.join(self.testdir, name + ".data")
321         f = open(filename, "wb")
322         block = "a" * 8192
323         while size > 0:
324             l = min(size, 8192)
325             f.write(block[:l])
326             size -= l
327         return filename
328
329     def stash_stats(self, stats, name):
330         self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
331         self.statsfile.flush()
332         self.stats[name] = stats['VmPeak']
333
334     def POST(self, urlpath, **fields):
335         url = self.webish_url + urlpath
336         sepbase = "boogabooga"
337         sep = "--" + sepbase
338         form = []
339         form.append(sep)
340         form.append('Content-Disposition: form-data; name="_charset"')
341         form.append('')
342         form.append('UTF-8')
343         form.append(sep)
344         for name, value in fields.iteritems():
345             if isinstance(value, tuple):
346                 filename, value = value
347                 form.append('Content-Disposition: form-data; name="%s"; '
348                             'filename="%s"' % (name, filename))
349             else:
350                 form.append('Content-Disposition: form-data; name="%s"' % name)
351             form.append('')
352             form.append(value)
353             form.append(sep)
354         form[-1] += "--"
355         body = "\r\n".join(form) + "\r\n"
356         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
357                    }
358         return tw_client.getPage(url, method="POST", postdata=body,
359                                  headers=headers, followRedirect=False)
360
361     def GET_discard(self, urlpath, stall):
362         url = self.webish_url + urlpath + "?filename=dummy-get.out"
363         return discardPage(url, stall)
364
365     def _print_usage(self, res=None):
366         d = self.control_rref.callRemote("get_memory_usage")
367         def _print(stats):
368             print "VmSize: %9d  VmPeak: %9d" % (stats["VmSize"],
369                                                 stats["VmPeak"])
370             return stats
371         d.addCallback(_print)
372         return d
373
374     def _do_upload(self, res, size, files, uris):
375         name = '%d' % size
376         print
377         print "uploading %s" % name
378         if self.mode in ("upload", "upload-self"):
379             files[name] = self.create_data(name, size)
380             d = self.control_rref.callRemote("upload_from_file_to_uri",
381                                              files[name].encode("utf-8"),
382                                              convergence="check-memory")
383             def _done(uri):
384                 os.remove(files[name])
385                 del files[name]
386                 return uri
387             d.addCallback(_done)
388         elif self.mode == "upload-POST":
389             data = "a" * size
390             url = "/uri"
391             d = self.POST(url, t="upload", file=("%d.data" % size, data))
392         elif self.mode in ("receive",
393                            "download", "download-GET", "download-GET-slow"):
394             # mode=receive: upload the data from a local peer, so that the
395             # client-under-test receives and stores the shares
396             #
397             # mode=download*: upload the data from a local peer, then have
398             # the client-under-test download it.
399             #
400             # we need to wait until the uploading node has connected to all
401             # peers, since the wait_for_client_connections() above doesn't
402             # pay attention to our self.nodes[] and their connections.
403             files[name] = self.create_data(name, size)
404             u = self.nodes[0].getServiceNamed("uploader")
405             d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
406             d.addCallback(lambda res:
407                           u.upload(upload.FileName(files[name],
408                                                    convergence="check-memory")))
409             d.addCallback(lambda results: results.uri)
410         else:
411             raise ValueError("unknown mode=%s" % self.mode)
412         def _complete(uri):
413             uris[name] = uri
414             print "uploaded %s" % name
415         d.addCallback(_complete)
416         return d
417
418     def _do_download(self, res, size, uris):
419         if self.mode not in ("download", "download-GET", "download-GET-slow"):
420             return
421         name = '%d' % size
422         print "downloading %s" % name
423         uri = uris[name]
424
425         if self.mode == "download":
426             d = self.control_rref.callRemote("download_from_uri_to_file",
427                                              uri, "dummy.out")
428         elif self.mode == "download-GET":
429             url = "/uri/%s" % uri
430             d = self.GET_discard(urllib.quote(url), stall=False)
431         elif self.mode == "download-GET-slow":
432             url = "/uri/%s" % uri
433             d = self.GET_discard(urllib.quote(url), stall=True)
434
435         def _complete(res):
436             print "downloaded %s" % name
437             return res
438         d.addCallback(_complete)
439         return d
440
441     def do_test(self):
442         #print "CLIENT STARTED"
443         #print "FURL", self.control_furl
444         #print "RREF", self.control_rref
445         #print
446         kB = 1000; MB = 1000*1000
447         files = {}
448         uris = {}
449
450         d = self._print_usage()
451         d.addCallback(self.stash_stats, "0B")
452
453         for i in range(10):
454             d.addCallback(self._do_upload, 10*kB+i, files, uris)
455             d.addCallback(self._do_download, 10*kB+i, uris)
456             d.addCallback(self._print_usage)
457         d.addCallback(self.stash_stats, "10kB")
458
459         for i in range(3):
460             d.addCallback(self._do_upload, 10*MB+i, files, uris)
461             d.addCallback(self._do_download, 10*MB+i, uris)
462             d.addCallback(self._print_usage)
463         d.addCallback(self.stash_stats, "10MB")
464
465         for i in range(1):
466             d.addCallback(self._do_upload, 50*MB+i, files, uris)
467             d.addCallback(self._do_download, 50*MB+i, uris)
468             d.addCallback(self._print_usage)
469         d.addCallback(self.stash_stats, "50MB")
470
471         #for i in range(1):
472         #    d.addCallback(self._do_upload, 100*MB+i, files, uris)
473         #    d.addCallback(self._do_download, 100*MB+i, uris)
474         #    d.addCallback(self._print_usage)
475         #d.addCallback(self.stash_stats, "100MB")
476
477         #d.addCallback(self.stall)
478         def _done(res):
479             print "FINISHING"
480         d.addCallback(_done)
481         return d
482
483     def stall(self, res):
484         d = defer.Deferred()
485         reactor.callLater(5, d.callback, None)
486         return d
487
488
489 class ClientWatcher(protocol.ProcessProtocol):
490     ended = False
491     def outReceived(self, data):
492         print "OUT:", data
493     def errReceived(self, data):
494         print "ERR:", data
495     def processEnded(self, reason):
496         self.ended = reason
497         self.d.callback(None)
498
499
500 if __name__ == '__main__':
501     mode = "upload"
502     if len(sys.argv) > 1:
503         mode = sys.argv[1]
504     # put the logfile and stats.out in _test_memory/ . These stick around.
505     # put the nodes and other files in _test_memory/test/ . These are
506     # removed each time we run.
507     sf = SystemFramework("_test_memory", mode)
508     sf.run()
509