]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/check_memory.py
check_memory.py: record initial memory usage (before any connections are made)
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / check_memory.py
1 #! /usr/bin/env python
2
3 import os, shutil, sys, urllib
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web import client as tw_client
8 from allmydata import client, introducer_and_vdrive
9 from allmydata.scripts import create_node
10 from allmydata.util import testutil, fileutil
11 import foolscap
12 from foolscap import eventual
13 from twisted.python import log
14
15 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
16     full_speed_ahead = False
17     _bytes_so_far = 0
18     stalled = None
19     def handleResponsePart(self, data):
20         self._bytes_so_far += len(data)
21         if not self.factory.do_stall:
22             return
23         if self.full_speed_ahead:
24             return
25         if self._bytes_so_far > 1e6+100:
26             if not self.stalled:
27                 print "STALLING"
28                 self.transport.pauseProducing()
29                 self.stalled = reactor.callLater(10.0, self._resume_speed)
30     def _resume_speed(self):
31         print "RESUME SPEED"
32         self.stalled = None
33         self.full_speed_ahead = True
34         self.transport.resumeProducing()
35     def handleResponseEnd(self):
36         if self.stalled:
37             print "CANCEL"
38             self.stalled.cancel()
39             self.stalled = None
40         return tw_client.HTTPPageGetter.handleResponseEnd(self)
41
42 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
43     protocol = StallableHTTPGetterDiscarder
44
45 def discardPage(url, stall=False, *args, **kwargs):
46     """Start fetching the URL, but stall our pipe after the first 1MB.
47     Wait 10 seconds, then resume downloading (and discarding) everything.
48     """
49     # adapted from twisted.web.client.getPage . We can't just wrap or
50     # subclass because it provides no way to override the HTTPClientFactory
51     # that it creates.
52     scheme, host, port, path = tw_client._parse(url)
53     factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
54     factory.do_stall = stall
55     assert scheme == 'http'
56     reactor.connectTCP(host, port, factory)
57     return factory.deferred
58
59 class SystemFramework(testutil.PollMixin):
60     numnodes = 5
61
62     def __init__(self, basedir, mode):
63         self.basedir = basedir = os.path.abspath(basedir)
64         if not basedir.startswith(os.path.abspath(".")):
65             raise AssertionError("safety issue: basedir must be a subdir")
66         self.testdir = testdir = os.path.join(basedir, "test")
67         if os.path.exists(testdir):
68             shutil.rmtree(testdir)
69         fileutil.make_dirs(testdir)
70         self.sparent = service.MultiService()
71         self.sparent.startService()
72         self.proc = None
73         self.tub = foolscap.Tub()
74         self.tub.setServiceParent(self.sparent)
75         self.mode = mode
76         self.failed = False
77
78     def run(self):
79         log.startLogging(open(os.path.join(self.testdir, "log"), "w"),
80                          setStdout=False)
81         #logfile = open(os.path.join(self.testdir, "log"), "w")
82         #flo = log.FileLogObserver(logfile)
83         #log.startLoggingWithObserver(flo.emit, setStdout=False)
84         d = eventual.fireEventually()
85         d.addCallback(lambda res: self.setUp())
86         d.addCallback(lambda res: self.record_initial_memusage())
87         d.addCallback(lambda res: self.make_nodes())
88         d.addCallback(lambda res: self.wait_for_client_connected())
89         d.addCallback(lambda res: self.do_test())
90         d.addBoth(self.tearDown)
91         def _err(err):
92             self.failed = err
93             log.err(err)
94             print err
95         d.addErrback(_err)
96         def _done(res):
97             reactor.stop()
98             return res
99         d.addBoth(_done)
100         reactor.run()
101         if self.failed:
102             self.failed.raiseException()
103
104     def setUp(self):
105         #print "STARTING"
106         self.stats = {}
107         self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
108         d = self.make_introducer_and_vdrive()
109         def _more(res):
110             return self.start_client()
111         d.addCallback(_more)
112         def _record_control_furl(control_furl):
113             self.control_furl = control_furl
114             #print "OBTAINING '%s'" % (control_furl,)
115             return self.tub.getReference(self.control_furl)
116         d.addCallback(_record_control_furl)
117         def _record_control(control_rref):
118             self.control_rref = control_rref
119         d.addCallback(_record_control)
120         def _ready(res):
121             #print "CLIENT READY"
122             pass
123         d.addCallback(_ready)
124         return d
125
126     def record_initial_memusage(self):
127         print
128         print "Client started (no connections yet)"
129         d = self._print_usage()
130         d.addCallback(self.stash_stats, "init")
131         return d
132
133     def wait_for_client_connected(self):
134         print
135         print "Client connecting to other nodes.."
136         return self.control_rref.callRemote("wait_for_client_connections",
137                                             self.numnodes+1)
138
139     def tearDown(self, passthrough):
140         # the client node will shut down in a few seconds
141         #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
142         log.msg("shutting down SystemTest services")
143         d = defer.succeed(None)
144         if self.proc:
145             d.addCallback(lambda res: self.kill_client())
146         d.addCallback(lambda res: self.sparent.stopService())
147         d.addCallback(lambda res: eventual.flushEventualQueue())
148         def _close_statsfile(res):
149             self.statsfile.close()
150         d.addCallback(_close_statsfile)
151         d.addCallback(lambda res: passthrough)
152         return d
153
154     def add_service(self, s):
155         s.setServiceParent(self.sparent)
156         return s
157
158     def make_introducer_and_vdrive(self):
159         iv_basedir = os.path.join(self.testdir, "introducer_and_vdrive")
160         os.mkdir(iv_basedir)
161         iv = introducer_and_vdrive.IntroducerAndVdrive(basedir=iv_basedir)
162         self.introducer_and_vdrive = self.add_service(iv)
163         d = self.introducer_and_vdrive.when_tub_ready()
164         def _introducer_ready(res):
165             q = self.introducer_and_vdrive
166             self.introducer_furl = q.urls["introducer"]
167             self.vdrive_furl = q.urls["vdrive"]
168         d.addCallback(_introducer_ready)
169         return d
170
171     def make_nodes(self):
172         self.nodes = []
173         for i in range(self.numnodes):
174             nodedir = os.path.join(self.testdir, "node%d" % i)
175             os.mkdir(nodedir)
176             f = open(os.path.join(nodedir, "introducer.furl"), "w")
177             f.write(self.introducer_furl)
178             f.close()
179             f = open(os.path.join(nodedir, "vdrive.furl"), "w")
180             f.write(self.vdrive_furl)
181             f.close()
182             # the only tests for which we want the internal nodes to actually
183             # retain shares are the ones where somebody's going to download
184             # them.
185             if self.mode in ("download", "download-GET", "download-GET-slow"):
186                 # retain shares
187                 pass
188             else:
189                 # for these tests, we tell the storage servers to pretend to
190                 # accept shares, but really just throw them out, since we're
191                 # only testing upload and not download.
192                 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
193                 f.write("no_storage\n")
194                 f.close()
195             if self.mode in ("receive",):
196                 # for this mode, the client-under-test gets all the shares,
197                 # so our internal nodes can refuse requests
198                 f = open(os.path.join(nodedir, "sizelimit"), "w")
199                 f.write("0\n")
200                 f.close()
201             c = self.add_service(client.Client(basedir=nodedir))
202             self.nodes.append(c)
203         # the peers will start running, eventually they will connect to each
204         # other and the introducer_and_vdrive
205
206     def touch_keepalive(self):
207         f = open(self.keepalive_file, "w")
208         f.write("""\
209 If the node notices this file at startup, it will poll every 5 seconds and
210 terminate if the file is more than 10 seconds old, or if it has been deleted.
211 If the test harness has an internal failure and neglects to kill off the node
212 itself, this helps to avoid leaving processes lying around. The contents of
213 this file are ignored.
214         """)
215         f.close()
216
217     def start_client(self):
218         # this returns a Deferred that fires with the client's control.furl
219         log.msg("MAKING CLIENT")
220         clientdir = self.clientdir = os.path.join(self.testdir, "client")
221         quiet = StringIO()
222         create_node.create_client(clientdir, {}, out=quiet)
223         log.msg("DONE MAKING CLIENT")
224         f = open(os.path.join(clientdir, "introducer.furl"), "w")
225         f.write(self.introducer_furl + "\n")
226         f.close()
227         f = open(os.path.join(clientdir, "vdrive.furl"), "w")
228         f.write(self.vdrive_furl + "\n")
229         f.close()
230         f = open(os.path.join(clientdir, "webport"), "w")
231         # TODO: ideally we would set webport=0 and then ask the node what
232         # port it picked. But at the moment it is not convenient to do this,
233         # so we just pick a relatively unique one.
234         webport = max(os.getpid(), 2000)
235         f.write("tcp:%d:interface=127.0.0.1\n" % webport)
236         f.close()
237         self.webish_url = "http://localhost:%d" % webport
238         if self.mode in ("upload-self", "receive"):
239             # accept and store shares, to trigger the memory consumption bugs
240             pass
241         else:
242             # don't accept any shares
243             f = open(os.path.join(clientdir, "sizelimit"), "w")
244             f.write("0\n")
245             f.close()
246             ## also, if we do receive any shares, throw them away
247             #f = open(os.path.join(clientdir, "debug_no_storage"), "w")
248             #f.write("no_storage\n")
249             #f.close()
250         if self.mode == "upload-self":
251             f = open(os.path.join(clientdir, "push_to_ourselves"), "w")
252             f.write("push_to_ourselves\n")
253             f.close()
254         self.keepalive_file = os.path.join(clientdir,
255                                            "suicide_prevention_hotline")
256         # now start updating the mtime.
257         self.touch_keepalive()
258         ts = internet.TimerService(4.0, self.touch_keepalive)
259         ts.setServiceParent(self.sparent)
260
261         pp = ClientWatcher()
262         self.proc_done = pp.d = defer.Deferred()
263         logfile = os.path.join(self.basedir, "client.log")
264         cmd = ["twistd", "-n", "-y", "client.tac", "-l", logfile]
265         env = os.environ.copy()
266         self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
267         log.msg("CLIENT STARTED")
268
269         # now we wait for the client to get started. we're looking for the
270         # control.furl file to appear.
271         furl_file = os.path.join(clientdir, "control.furl")
272         def _check():
273             if pp.ended and pp.ended.value.status != 0:
274                 # the twistd process ends normally (with rc=0) if the child
275                 # is successfully launched. It ends abnormally (with rc!=0)
276                 # if the child cannot be launched.
277                 raise RuntimeError("process ended while waiting for startup")
278             return os.path.exists(furl_file)
279         d = self.poll(_check, 0.1)
280         # once it exists, wait a moment before we read from it, just in case
281         # it hasn't finished writing the whole thing. Ideally control.furl
282         # would be created in some atomic fashion, or made non-readable until
283         # it's ready, but I can't think of an easy way to do that, and I
284         # think the chances that we'll observe a half-write are pretty low.
285         def _stall(res):
286             d2 = defer.Deferred()
287             reactor.callLater(0.1, d2.callback, None)
288             return d2
289         d.addCallback(_stall)
290         def _read(res):
291             f = open(furl_file, "r")
292             furl = f.read()
293             return furl.strip()
294         d.addCallback(_read)
295         return d
296
297
298     def kill_client(self):
299         # returns a Deferred that fires when the process exits. This may only
300         # be called once.
301         try:
302             self.proc.signalProcess("INT")
303         except error.ProcessExitedAlready:
304             pass
305         return self.proc_done
306
307
308     def create_data(self, name, size):
309         filename = os.path.join(self.testdir, name + ".data")
310         f = open(filename, "wb")
311         block = "a" * 8192
312         while size > 0:
313             l = min(size, 8192)
314             f.write(block[:l])
315             size -= l
316         return filename
317
318     def stash_stats(self, stats, name):
319         self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
320         self.statsfile.flush()
321         self.stats[name] = stats['VmPeak']
322
323     def POST(self, urlpath, **fields):
324         url = self.webish_url + urlpath
325         sepbase = "boogabooga"
326         sep = "--" + sepbase
327         form = []
328         form.append(sep)
329         form.append('Content-Disposition: form-data; name="_charset"')
330         form.append('')
331         form.append('UTF-8')
332         form.append(sep)
333         for name, value in fields.iteritems():
334             if isinstance(value, tuple):
335                 filename, value = value
336                 form.append('Content-Disposition: form-data; name="%s"; '
337                             'filename="%s"' % (name, filename))
338             else:
339                 form.append('Content-Disposition: form-data; name="%s"' % name)
340             form.append('')
341             form.append(value)
342             form.append(sep)
343         form[-1] += "--"
344         body = "\r\n".join(form) + "\r\n"
345         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
346                    }
347         return tw_client.getPage(url, method="POST", postdata=body,
348                                  headers=headers, followRedirect=False)
349
350     def GET_discard(self, urlpath, stall):
351         url = self.webish_url + urlpath + "?filename=dummy-get.out"
352         return discardPage(url, stall)
353
354     def _print_usage(self, res=None):
355         d = self.control_rref.callRemote("get_memory_usage")
356         def _print(stats):
357             print "VmSize: %9d  VmPeak: %9d" % (stats["VmSize"],
358                                                 stats["VmPeak"])
359             return stats
360         d.addCallback(_print)
361         return d
362
363     def _do_upload(self, res, size, files, uris):
364         name = '%d' % size
365         print
366         print "uploading %s" % name
367         if self.mode in ("upload", "upload-self"):
368             files[name] = self.create_data(name, size)
369             d = self.control_rref.callRemote("upload_from_file_to_uri",
370                                              files[name])
371             def _done(uri):
372                 os.remove(files[name])
373                 del files[name]
374                 return uri
375             d.addCallback(_done)
376         elif self.mode == "upload-POST":
377             data = "a" * size
378             url = "/vdrive/global"
379             d = self.POST(url, t="upload", file=("%d.data" % size, data))
380         elif self.mode in ("receive",):
381             # upload the data from a local peer, so that the
382             # client-under-test receives and stores the shares
383             files[name] = self.create_data(name, size)
384             u = self.nodes[0].getServiceNamed("uploader")
385             d = u.upload_filename(files[name])
386         elif self.mode in ("download", "download-GET", "download-GET-slow"):
387             # upload the data from a local peer, then have the
388             # client-under-test download it.
389             files[name] = self.create_data(name, size)
390             u = self.nodes[0].getServiceNamed("uploader")
391             d = u.upload_filename(files[name])
392         else:
393             raise RuntimeError("unknown mode=%s" % self.mode)
394         def _complete(uri):
395             uris[name] = uri
396             print "uploaded %s" % name
397         d.addCallback(_complete)
398         return d
399
400     def _do_download(self, res, size, uris):
401         if self.mode not in ("download", "download-GET", "download-GET-slow"):
402             return
403         name = '%d' % size
404         print "downloading %s" % name
405         uri = uris[name]
406
407         if self.mode == "download":
408             d = self.control_rref.callRemote("download_from_uri_to_file",
409                                              uri, "dummy.out")
410         elif self.mode == "download-GET":
411             url = "/uri/%s" % uri
412             d = self.GET_discard(urllib.quote(url), stall=False)
413         elif self.mode == "download-GET-slow":
414             url = "/uri/%s" % uri
415             d = self.GET_discard(urllib.quote(url), stall=True)
416
417         def _complete(res):
418             print "downloaded %s" % name
419             return res
420         d.addCallback(_complete)
421         return d
422
423     def do_test(self):
424         #print "CLIENT STARTED"
425         #print "FURL", self.control_furl
426         #print "RREF", self.control_rref
427         #print
428         kB = 1000; MB = 1000*1000
429         files = {}
430         uris = {}
431
432         d = self._print_usage()
433         d.addCallback(self.stash_stats, "0B")
434
435         for i in range(10):
436             d.addCallback(self._do_upload, 10*kB+i, files, uris)
437             d.addCallback(self._do_download, 10*kB+i, uris)
438             d.addCallback(self._print_usage)
439         d.addCallback(self.stash_stats, "10kB")
440
441         for i in range(3):
442             d.addCallback(self._do_upload, 10*MB+i, files, uris)
443             d.addCallback(self._do_download, 10*MB+i, uris)
444             d.addCallback(self._print_usage)
445         d.addCallback(self.stash_stats, "10MB")
446
447         for i in range(1):
448             d.addCallback(self._do_upload, 50*MB+i, files, uris)
449             d.addCallback(self._do_download, 50*MB+i, uris)
450             d.addCallback(self._print_usage)
451         d.addCallback(self.stash_stats, "50MB")
452
453         #for i in range(1):
454         #    d.addCallback(self._do_upload, 100*MB+i, files, uris)
455         #    d.addCallback(self._do_download, 100*MB+i, uris)
456         #    d.addCallback(self._print_usage)
457         #d.addCallback(self.stash_stats, "100MB")
458
459         #d.addCallback(self.stall)
460         def _done(res):
461             print "FINISHING"
462         d.addCallback(_done)
463         return d
464
465     def stall(self, res):
466         d = defer.Deferred()
467         reactor.callLater(5, d.callback, None)
468         return d
469
470
471 class ClientWatcher(protocol.ProcessProtocol):
472     ended = False
473     def outReceived(self, data):
474         print "OUT:", data
475     def errReceived(self, data):
476         print "ERR:", data
477     def processEnded(self, reason):
478         self.ended = reason
479         self.d.callback(None)
480
481
482 if __name__ == '__main__':
483     mode = "upload"
484     if len(sys.argv) > 1:
485         mode = sys.argv[1]
486     # put the logfile and stats.out in _test_memory/ . These stick around.
487     # put the nodes and other files in _test_memory/test/ . These are
488     # removed each time we run.
489     sf = SystemFramework("_test_memory", mode)
490     sf.run()
491