]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/check_memory.py
a106e6b1dda93ef899175f0f121eab3cf267325a
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / check_memory.py
1 #! /usr/bin/env python
2
3 import os, shutil, sys, urllib, time, stat
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web import client as tw_client
8 from allmydata import client, introducer
9 from allmydata.immutable import upload
10 from allmydata.scripts import create_node
11 from allmydata.util import fileutil, pollmixin
12 import foolscap
13 from foolscap import eventual
14 from twisted.python import log
15
16 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
17     full_speed_ahead = False
18     _bytes_so_far = 0
19     stalled = None
20     def handleResponsePart(self, data):
21         self._bytes_so_far += len(data)
22         if not self.factory.do_stall:
23             return
24         if self.full_speed_ahead:
25             return
26         if self._bytes_so_far > 1e6+100:
27             if not self.stalled:
28                 print "STALLING"
29                 self.transport.pauseProducing()
30                 self.stalled = reactor.callLater(10.0, self._resume_speed)
31     def _resume_speed(self):
32         print "RESUME SPEED"
33         self.stalled = None
34         self.full_speed_ahead = True
35         self.transport.resumeProducing()
36     def handleResponseEnd(self):
37         if self.stalled:
38             print "CANCEL"
39             self.stalled.cancel()
40             self.stalled = None
41         return tw_client.HTTPPageGetter.handleResponseEnd(self)
42
43 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
44     protocol = StallableHTTPGetterDiscarder
45
46 def discardPage(url, stall=False, *args, **kwargs):
47     """Start fetching the URL, but stall our pipe after the first 1MB.
48     Wait 10 seconds, then resume downloading (and discarding) everything.
49     """
50     # adapted from twisted.web.client.getPage . We can't just wrap or
51     # subclass because it provides no way to override the HTTPClientFactory
52     # that it creates.
53     scheme, host, port, path = tw_client._parse(url)
54     factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
55     factory.do_stall = stall
56     assert scheme == 'http'
57     reactor.connectTCP(host, port, factory)
58     return factory.deferred
59
60 class SystemFramework(pollmixin.PollMixin):
61     numnodes = 5
62
63     def __init__(self, basedir, mode):
64         self.basedir = basedir = os.path.abspath(basedir)
65         if not basedir.startswith(os.path.abspath(".")):
66             raise AssertionError("safety issue: basedir must be a subdir")
67         self.testdir = testdir = os.path.join(basedir, "test")
68         if os.path.exists(testdir):
69             shutil.rmtree(testdir)
70         fileutil.make_dirs(testdir)
71         self.sparent = service.MultiService()
72         self.sparent.startService()
73         self.proc = None
74         self.tub = foolscap.Tub()
75         self.tub.setServiceParent(self.sparent)
76         self.mode = mode
77         self.failed = False
78         self.keepalive_file = None
79
80     def run(self):
81         framelog = os.path.join(self.basedir, "driver.log")
82         log.startLogging(open(framelog, "a"), setStdout=False)
83         log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
84         #logfile = open(os.path.join(self.testdir, "log"), "w")
85         #flo = log.FileLogObserver(logfile)
86         #log.startLoggingWithObserver(flo.emit, setStdout=False)
87         d = eventual.fireEventually()
88         d.addCallback(lambda res: self.setUp())
89         d.addCallback(lambda res: self.record_initial_memusage())
90         d.addCallback(lambda res: self.make_nodes())
91         d.addCallback(lambda res: self.wait_for_client_connected())
92         d.addCallback(lambda res: self.do_test())
93         d.addBoth(self.tearDown)
94         def _err(err):
95             self.failed = err
96             log.err(err)
97             print err
98         d.addErrback(_err)
99         def _done(res):
100             reactor.stop()
101             return res
102         d.addBoth(_done)
103         reactor.run()
104         if self.failed:
105             # raiseException doesn't work for CopiedFailures
106             self.failed.raiseException()
107
108     def setUp(self):
109         #print "STARTING"
110         self.stats = {}
111         self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
112         d = self.make_introducer()
113         def _more(res):
114             return self.start_client()
115         d.addCallback(_more)
116         def _record_control_furl(control_furl):
117             self.control_furl = control_furl
118             #print "OBTAINING '%s'" % (control_furl,)
119             return self.tub.getReference(self.control_furl)
120         d.addCallback(_record_control_furl)
121         def _record_control(control_rref):
122             self.control_rref = control_rref
123         d.addCallback(_record_control)
124         def _ready(res):
125             #print "CLIENT READY"
126             pass
127         d.addCallback(_ready)
128         return d
129
130     def record_initial_memusage(self):
131         print
132         print "Client started (no connections yet)"
133         d = self._print_usage()
134         d.addCallback(self.stash_stats, "init")
135         return d
136
137     def wait_for_client_connected(self):
138         print
139         print "Client connecting to other nodes.."
140         return self.control_rref.callRemote("wait_for_client_connections",
141                                             self.numnodes+1)
142
143     def tearDown(self, passthrough):
144         # the client node will shut down in a few seconds
145         #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
146         log.msg("shutting down SystemTest services")
147         if self.keepalive_file and os.path.exists(self.keepalive_file):
148             age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
149             log.msg("keepalive file at shutdown was %ds old" % age)
150         d = defer.succeed(None)
151         if self.proc:
152             d.addCallback(lambda res: self.kill_client())
153         d.addCallback(lambda res: self.sparent.stopService())
154         d.addCallback(lambda res: eventual.flushEventualQueue())
155         def _close_statsfile(res):
156             self.statsfile.close()
157         d.addCallback(_close_statsfile)
158         d.addCallback(lambda res: passthrough)
159         return d
160
161     def add_service(self, s):
162         s.setServiceParent(self.sparent)
163         return s
164
165     def make_introducer(self):
166         iv_basedir = os.path.join(self.testdir, "introducer")
167         os.mkdir(iv_basedir)
168         iv = introducer.IntroducerNode(basedir=iv_basedir)
169         self.introducer = self.add_service(iv)
170         d = self.introducer.when_tub_ready()
171         def _introducer_ready(res):
172             q = self.introducer
173             self.introducer_furl = q.introducer_url
174         d.addCallback(_introducer_ready)
175         return d
176
177     def make_nodes(self):
178         self.nodes = []
179         for i in range(self.numnodes):
180             nodedir = os.path.join(self.testdir, "node%d" % i)
181             os.mkdir(nodedir)
182             f = open(os.path.join(nodedir, "introducer.furl"), "w")
183             f.write(self.introducer_furl)
184             f.close()
185             # the only tests for which we want the internal nodes to actually
186             # retain shares are the ones where somebody's going to download
187             # them.
188             if self.mode in ("download", "download-GET", "download-GET-slow"):
189                 # retain shares
190                 pass
191             else:
192                 # for these tests, we tell the storage servers to pretend to
193                 # accept shares, but really just throw them out, since we're
194                 # only testing upload and not download.
195                 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
196                 f.write("no_storage\n")
197                 f.close()
198             if self.mode in ("receive",):
199                 # for this mode, the client-under-test gets all the shares,
200                 # so our internal nodes can refuse requests
201                 f = open(os.path.join(nodedir, "sizelimit"), "w")
202                 f.write("0\n")
203                 f.close()
204             c = self.add_service(client.Client(basedir=nodedir))
205             self.nodes.append(c)
206         # the peers will start running, eventually they will connect to each
207         # other and the introducer
208
209     def touch_keepalive(self):
210         if os.path.exists(self.keepalive_file):
211             age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
212             log.msg("touching keepalive file, was %ds old" % age)
213         f = open(self.keepalive_file, "w")
214         f.write("""\
215 If the node notices this file at startup, it will poll every 5 seconds and
216 terminate if the file is more than 10 seconds old, or if it has been deleted.
217 If the test harness has an internal failure and neglects to kill off the node
218 itself, this helps to avoid leaving processes lying around. The contents of
219 this file are ignored.
220         """)
221         f.close()
222
223     def start_client(self):
224         # this returns a Deferred that fires with the client's control.furl
225         log.msg("MAKING CLIENT")
226         clientdir = self.clientdir = os.path.join(self.testdir, "client")
227         quiet = StringIO()
228         create_node.create_client(clientdir, {}, out=quiet)
229         log.msg("DONE MAKING CLIENT")
230         f = open(os.path.join(clientdir, "introducer.furl"), "w")
231         f.write(self.introducer_furl + "\n")
232         f.close()
233
234         # set webport=0 and then ask the node what port it picked.
235         f = open(os.path.join(clientdir, "webport"), "w")
236         f.write("tcp:0:interface=127.0.0.1\n")
237         f.close()
238
239         if self.mode in ("upload-self", "receive"):
240             # accept and store shares, to trigger the memory consumption bugs
241             pass
242         else:
243             # don't accept any shares
244             f = open(os.path.join(clientdir, "readonly_storage"), "w")
245             f.write("true\n")
246             f.close()
247             ## also, if we do receive any shares, throw them away
248             #f = open(os.path.join(clientdir, "debug_no_storage"), "w")
249             #f.write("no_storage\n")
250             #f.close()
251         if self.mode == "upload-self":
252             pass
253         self.keepalive_file = os.path.join(clientdir,
254                                            "suicide_prevention_hotline")
255         # now start updating the mtime.
256         self.touch_keepalive()
257         ts = internet.TimerService(1.0, self.touch_keepalive)
258         ts.setServiceParent(self.sparent)
259
260         pp = ClientWatcher()
261         self.proc_done = pp.d = defer.Deferred()
262         logfile = os.path.join(self.basedir, "client.log")
263         cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
264         env = os.environ.copy()
265         self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
266         log.msg("CLIENT STARTED")
267
268         # now we wait for the client to get started. we're looking for the
269         # control.furl file to appear.
270         furl_file = os.path.join(clientdir, "private", "control.furl")
271         url_file = os.path.join(clientdir, "node.url")
272         def _check():
273             if pp.ended and pp.ended.value.status != 0:
274                 # the twistd process ends normally (with rc=0) if the child
275                 # is successfully launched. It ends abnormally (with rc!=0)
276                 # if the child cannot be launched.
277                 raise RuntimeError("process ended while waiting for startup")
278             return os.path.exists(furl_file)
279         d = self.poll(_check, 0.1)
280         # once it exists, wait a moment before we read from it, just in case
281         # it hasn't finished writing the whole thing. Ideally control.furl
282         # would be created in some atomic fashion, or made non-readable until
283         # it's ready, but I can't think of an easy way to do that, and I
284         # think the chances that we'll observe a half-write are pretty low.
285         def _stall(res):
286             d2 = defer.Deferred()
287             reactor.callLater(0.1, d2.callback, None)
288             return d2
289         d.addCallback(_stall)
290         def _read(res):
291             # read the node's URL
292             self.webish_url = open(url_file, "r").read().strip()
293             if self.webish_url[-1] == "/":
294                 # trim trailing slash, since the rest of the code wants it gone
295                 self.webish_url = self.webish_url[:-1]
296             f = open(furl_file, "r")
297             furl = f.read()
298             return furl.strip()
299         d.addCallback(_read)
300         return d
301
302
303     def kill_client(self):
304         # returns a Deferred that fires when the process exits. This may only
305         # be called once.
306         try:
307             self.proc.signalProcess("INT")
308         except error.ProcessExitedAlready:
309             pass
310         return self.proc_done
311
312
313     def create_data(self, name, size):
314         filename = os.path.join(self.testdir, name + ".data")
315         f = open(filename, "wb")
316         block = "a" * 8192
317         while size > 0:
318             l = min(size, 8192)
319             f.write(block[:l])
320             size -= l
321         return filename
322
323     def stash_stats(self, stats, name):
324         self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
325         self.statsfile.flush()
326         self.stats[name] = stats['VmPeak']
327
328     def POST(self, urlpath, **fields):
329         url = self.webish_url + urlpath
330         sepbase = "boogabooga"
331         sep = "--" + sepbase
332         form = []
333         form.append(sep)
334         form.append('Content-Disposition: form-data; name="_charset"')
335         form.append('')
336         form.append('UTF-8')
337         form.append(sep)
338         for name, value in fields.iteritems():
339             if isinstance(value, tuple):
340                 filename, value = value
341                 form.append('Content-Disposition: form-data; name="%s"; '
342                             'filename="%s"' % (name, filename))
343             else:
344                 form.append('Content-Disposition: form-data; name="%s"' % name)
345             form.append('')
346             form.append(value)
347             form.append(sep)
348         form[-1] += "--"
349         body = "\r\n".join(form) + "\r\n"
350         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
351                    }
352         return tw_client.getPage(url, method="POST", postdata=body,
353                                  headers=headers, followRedirect=False)
354
355     def GET_discard(self, urlpath, stall):
356         url = self.webish_url + urlpath + "?filename=dummy-get.out"
357         return discardPage(url, stall)
358
359     def _print_usage(self, res=None):
360         d = self.control_rref.callRemote("get_memory_usage")
361         def _print(stats):
362             print "VmSize: %9d  VmPeak: %9d" % (stats["VmSize"],
363                                                 stats["VmPeak"])
364             return stats
365         d.addCallback(_print)
366         return d
367
368     def _do_upload(self, res, size, files, uris):
369         name = '%d' % size
370         print
371         print "uploading %s" % name
372         if self.mode in ("upload", "upload-self"):
373             files[name] = self.create_data(name, size)
374             d = self.control_rref.callRemote("upload_from_file_to_uri",
375                                              files[name], convergence="check-memory convergence string")
376             def _done(uri):
377                 os.remove(files[name])
378                 del files[name]
379                 return uri
380             d.addCallback(_done)
381         elif self.mode == "upload-POST":
382             data = "a" * size
383             url = "/uri"
384             d = self.POST(url, t="upload", file=("%d.data" % size, data))
385         elif self.mode in ("receive",
386                            "download", "download-GET", "download-GET-slow"):
387             # mode=receive: upload the data from a local peer, so that the
388             # client-under-test receives and stores the shares
389             #
390             # mode=download*: upload the data from a local peer, then have
391             # the client-under-test download it.
392             #
393             # we need to wait until the uploading node has connected to all
394             # peers, since the wait_for_client_connections() above doesn't
395             # pay attention to our self.nodes[] and their connections.
396             files[name] = self.create_data(name, size)
397             u = self.nodes[0].getServiceNamed("uploader")
398             d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
399             d.addCallback(lambda res: u.upload(upload.FileName(files[name], convergence="check-memory convergence string")))
400             d.addCallback(lambda results: results.uri)
401         else:
402             raise RuntimeError("unknown mode=%s" % self.mode)
403         def _complete(uri):
404             uris[name] = uri
405             print "uploaded %s" % name
406         d.addCallback(_complete)
407         return d
408
409     def _do_download(self, res, size, uris):
410         if self.mode not in ("download", "download-GET", "download-GET-slow"):
411             return
412         name = '%d' % size
413         print "downloading %s" % name
414         uri = uris[name]
415
416         if self.mode == "download":
417             d = self.control_rref.callRemote("download_from_uri_to_file",
418                                              uri, "dummy.out")
419         elif self.mode == "download-GET":
420             url = "/uri/%s" % uri
421             d = self.GET_discard(urllib.quote(url), stall=False)
422         elif self.mode == "download-GET-slow":
423             url = "/uri/%s" % uri
424             d = self.GET_discard(urllib.quote(url), stall=True)
425
426         def _complete(res):
427             print "downloaded %s" % name
428             return res
429         d.addCallback(_complete)
430         return d
431
432     def do_test(self):
433         #print "CLIENT STARTED"
434         #print "FURL", self.control_furl
435         #print "RREF", self.control_rref
436         #print
437         kB = 1000; MB = 1000*1000
438         files = {}
439         uris = {}
440
441         d = self._print_usage()
442         d.addCallback(self.stash_stats, "0B")
443
444         for i in range(10):
445             d.addCallback(self._do_upload, 10*kB+i, files, uris)
446             d.addCallback(self._do_download, 10*kB+i, uris)
447             d.addCallback(self._print_usage)
448         d.addCallback(self.stash_stats, "10kB")
449
450         for i in range(3):
451             d.addCallback(self._do_upload, 10*MB+i, files, uris)
452             d.addCallback(self._do_download, 10*MB+i, uris)
453             d.addCallback(self._print_usage)
454         d.addCallback(self.stash_stats, "10MB")
455
456         for i in range(1):
457             d.addCallback(self._do_upload, 50*MB+i, files, uris)
458             d.addCallback(self._do_download, 50*MB+i, uris)
459             d.addCallback(self._print_usage)
460         d.addCallback(self.stash_stats, "50MB")
461
462         #for i in range(1):
463         #    d.addCallback(self._do_upload, 100*MB+i, files, uris)
464         #    d.addCallback(self._do_download, 100*MB+i, uris)
465         #    d.addCallback(self._print_usage)
466         #d.addCallback(self.stash_stats, "100MB")
467
468         #d.addCallback(self.stall)
469         def _done(res):
470             print "FINISHING"
471         d.addCallback(_done)
472         return d
473
474     def stall(self, res):
475         d = defer.Deferred()
476         reactor.callLater(5, d.callback, None)
477         return d
478
479
480 class ClientWatcher(protocol.ProcessProtocol):
481     ended = False
482     def outReceived(self, data):
483         print "OUT:", data
484     def errReceived(self, data):
485         print "ERR:", data
486     def processEnded(self, reason):
487         self.ended = reason
488         self.d.callback(None)
489
490
491 if __name__ == '__main__':
492     mode = "upload"
493     if len(sys.argv) > 1:
494         mode = sys.argv[1]
495     # put the logfile and stats.out in _test_memory/ . These stick around.
496     # put the nodes and other files in _test_memory/test/ . These are
497     # removed each time we run.
498     sf = SystemFramework("_test_memory", mode)
499     sf.run()
500