]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/check_memory.py
check_memory.py: don't use self.keepalive_file until it's been initialized
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / check_memory.py
1 #! /usr/bin/env python
2
3 import os, shutil, sys, urllib, time, stat
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web import client as tw_client
8 from allmydata import client, introducer_and_vdrive
9 from allmydata.scripts import create_node
10 from allmydata.util import testutil, fileutil
11 import foolscap
12 from foolscap import eventual
13 from twisted.python import log
14
15 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
16     full_speed_ahead = False
17     _bytes_so_far = 0
18     stalled = None
19     def handleResponsePart(self, data):
20         self._bytes_so_far += len(data)
21         if not self.factory.do_stall:
22             return
23         if self.full_speed_ahead:
24             return
25         if self._bytes_so_far > 1e6+100:
26             if not self.stalled:
27                 print "STALLING"
28                 self.transport.pauseProducing()
29                 self.stalled = reactor.callLater(10.0, self._resume_speed)
30     def _resume_speed(self):
31         print "RESUME SPEED"
32         self.stalled = None
33         self.full_speed_ahead = True
34         self.transport.resumeProducing()
35     def handleResponseEnd(self):
36         if self.stalled:
37             print "CANCEL"
38             self.stalled.cancel()
39             self.stalled = None
40         return tw_client.HTTPPageGetter.handleResponseEnd(self)
41
42 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
43     protocol = StallableHTTPGetterDiscarder
44
45 def discardPage(url, stall=False, *args, **kwargs):
46     """Start fetching the URL, but stall our pipe after the first 1MB.
47     Wait 10 seconds, then resume downloading (and discarding) everything.
48     """
49     # adapted from twisted.web.client.getPage . We can't just wrap or
50     # subclass because it provides no way to override the HTTPClientFactory
51     # that it creates.
52     scheme, host, port, path = tw_client._parse(url)
53     factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
54     factory.do_stall = stall
55     assert scheme == 'http'
56     reactor.connectTCP(host, port, factory)
57     return factory.deferred
58
59 class SystemFramework(testutil.PollMixin):
60     numnodes = 5
61
62     def __init__(self, basedir, mode):
63         self.basedir = basedir = os.path.abspath(basedir)
64         if not basedir.startswith(os.path.abspath(".")):
65             raise AssertionError("safety issue: basedir must be a subdir")
66         self.testdir = testdir = os.path.join(basedir, "test")
67         if os.path.exists(testdir):
68             shutil.rmtree(testdir)
69         fileutil.make_dirs(testdir)
70         self.sparent = service.MultiService()
71         self.sparent.startService()
72         self.proc = None
73         self.tub = foolscap.Tub()
74         self.tub.setServiceParent(self.sparent)
75         self.mode = mode
76         self.failed = False
77         self.keepalive_file = None
78
79     def run(self):
80         framelog = os.path.join(self.basedir, "driver.log")
81         log.startLogging(open(framelog, "a"), setStdout=False)
82         log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
83         #logfile = open(os.path.join(self.testdir, "log"), "w")
84         #flo = log.FileLogObserver(logfile)
85         #log.startLoggingWithObserver(flo.emit, setStdout=False)
86         d = eventual.fireEventually()
87         d.addCallback(lambda res: self.setUp())
88         d.addCallback(lambda res: self.record_initial_memusage())
89         d.addCallback(lambda res: self.make_nodes())
90         d.addCallback(lambda res: self.wait_for_client_connected())
91         d.addCallback(lambda res: self.do_test())
92         d.addBoth(self.tearDown)
93         def _err(err):
94             self.failed = err
95             log.err(err)
96             print err
97         d.addErrback(_err)
98         def _done(res):
99             reactor.stop()
100             return res
101         d.addBoth(_done)
102         reactor.run()
103         if self.failed:
104             # raiseException doesn't work for CopiedFailures
105             self.failed.raiseException()
106
107     def setUp(self):
108         #print "STARTING"
109         self.stats = {}
110         self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
111         d = self.make_introducer_and_vdrive()
112         def _more(res):
113             return self.start_client()
114         d.addCallback(_more)
115         def _record_control_furl(control_furl):
116             self.control_furl = control_furl
117             #print "OBTAINING '%s'" % (control_furl,)
118             return self.tub.getReference(self.control_furl)
119         d.addCallback(_record_control_furl)
120         def _record_control(control_rref):
121             self.control_rref = control_rref
122         d.addCallback(_record_control)
123         def _ready(res):
124             #print "CLIENT READY"
125             pass
126         d.addCallback(_ready)
127         return d
128
129     def record_initial_memusage(self):
130         print
131         print "Client started (no connections yet)"
132         d = self._print_usage()
133         d.addCallback(self.stash_stats, "init")
134         return d
135
136     def wait_for_client_connected(self):
137         print
138         print "Client connecting to other nodes.."
139         return self.control_rref.callRemote("wait_for_client_connections",
140                                             self.numnodes+1)
141
142     def tearDown(self, passthrough):
143         # the client node will shut down in a few seconds
144         #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
145         log.msg("shutting down SystemTest services")
146         if self.keepalive_file and os.path.exists(self.keepalive_file):
147             age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
148             log.msg("keepalive file at shutdown was %ds old" % age)
149         d = defer.succeed(None)
150         if self.proc:
151             d.addCallback(lambda res: self.kill_client())
152         d.addCallback(lambda res: self.sparent.stopService())
153         d.addCallback(lambda res: eventual.flushEventualQueue())
154         def _close_statsfile(res):
155             self.statsfile.close()
156         d.addCallback(_close_statsfile)
157         d.addCallback(lambda res: passthrough)
158         return d
159
160     def add_service(self, s):
161         s.setServiceParent(self.sparent)
162         return s
163
164     def make_introducer_and_vdrive(self):
165         iv_basedir = os.path.join(self.testdir, "introducer_and_vdrive")
166         os.mkdir(iv_basedir)
167         iv = introducer_and_vdrive.IntroducerAndVdrive(basedir=iv_basedir)
168         self.introducer_and_vdrive = self.add_service(iv)
169         d = self.introducer_and_vdrive.when_tub_ready()
170         def _introducer_ready(res):
171             q = self.introducer_and_vdrive
172             self.introducer_furl = q.urls["introducer"]
173             self.vdrive_furl = q.urls["vdrive"]
174         d.addCallback(_introducer_ready)
175         return d
176
177     def make_nodes(self):
178         self.nodes = []
179         for i in range(self.numnodes):
180             nodedir = os.path.join(self.testdir, "node%d" % i)
181             os.mkdir(nodedir)
182             f = open(os.path.join(nodedir, "introducer.furl"), "w")
183             f.write(self.introducer_furl)
184             f.close()
185             f = open(os.path.join(nodedir, "vdrive.furl"), "w")
186             f.write(self.vdrive_furl)
187             f.close()
188             # the only tests for which we want the internal nodes to actually
189             # retain shares are the ones where somebody's going to download
190             # them.
191             if self.mode in ("download", "download-GET", "download-GET-slow"):
192                 # retain shares
193                 pass
194             else:
195                 # for these tests, we tell the storage servers to pretend to
196                 # accept shares, but really just throw them out, since we're
197                 # only testing upload and not download.
198                 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
199                 f.write("no_storage\n")
200                 f.close()
201             if self.mode in ("receive",):
202                 # for this mode, the client-under-test gets all the shares,
203                 # so our internal nodes can refuse requests
204                 f = open(os.path.join(nodedir, "sizelimit"), "w")
205                 f.write("0\n")
206                 f.close()
207             c = self.add_service(client.Client(basedir=nodedir))
208             self.nodes.append(c)
209         # the peers will start running, eventually they will connect to each
210         # other and the introducer_and_vdrive
211
212     def touch_keepalive(self):
213         if os.path.exists(self.keepalive_file):
214             age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
215             log.msg("touching keepalive file, was %ds old" % age)
216         f = open(self.keepalive_file, "w")
217         f.write("""\
218 If the node notices this file at startup, it will poll every 5 seconds and
219 terminate if the file is more than 10 seconds old, or if it has been deleted.
220 If the test harness has an internal failure and neglects to kill off the node
221 itself, this helps to avoid leaving processes lying around. The contents of
222 this file are ignored.
223         """)
224         f.close()
225
226     def start_client(self):
227         # this returns a Deferred that fires with the client's control.furl
228         log.msg("MAKING CLIENT")
229         clientdir = self.clientdir = os.path.join(self.testdir, "client")
230         quiet = StringIO()
231         create_node.create_client(clientdir, {}, out=quiet)
232         log.msg("DONE MAKING CLIENT")
233         f = open(os.path.join(clientdir, "introducer.furl"), "w")
234         f.write(self.introducer_furl + "\n")
235         f.close()
236         f = open(os.path.join(clientdir, "vdrive.furl"), "w")
237         f.write(self.vdrive_furl + "\n")
238         f.close()
239         f = open(os.path.join(clientdir, "webport"), "w")
240         # TODO: ideally we would set webport=0 and then ask the node what
241         # port it picked. But at the moment it is not convenient to do this,
242         # so we just pick a relatively unique one.
243         webport = max(os.getpid(), 2000)
244         f.write("tcp:%d:interface=127.0.0.1\n" % webport)
245         f.close()
246         self.webish_url = "http://localhost:%d" % webport
247         if self.mode in ("upload-self", "receive"):
248             # accept and store shares, to trigger the memory consumption bugs
249             pass
250         else:
251             # don't accept any shares
252             f = open(os.path.join(clientdir, "sizelimit"), "w")
253             f.write("0\n")
254             f.close()
255             ## also, if we do receive any shares, throw them away
256             #f = open(os.path.join(clientdir, "debug_no_storage"), "w")
257             #f.write("no_storage\n")
258             #f.close()
259         if self.mode == "upload-self":
260             f = open(os.path.join(clientdir, "push_to_ourselves"), "w")
261             f.write("push_to_ourselves\n")
262             f.close()
263         self.keepalive_file = os.path.join(clientdir,
264                                            "suicide_prevention_hotline")
265         # now start updating the mtime.
266         self.touch_keepalive()
267         ts = internet.TimerService(1.0, self.touch_keepalive)
268         ts.setServiceParent(self.sparent)
269
270         pp = ClientWatcher()
271         self.proc_done = pp.d = defer.Deferred()
272         logfile = os.path.join(self.basedir, "client.log")
273         cmd = ["twistd", "-n", "-y", "client.tac", "-l", logfile]
274         env = os.environ.copy()
275         self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
276         log.msg("CLIENT STARTED")
277
278         # now we wait for the client to get started. we're looking for the
279         # control.furl file to appear.
280         furl_file = os.path.join(clientdir, "control.furl")
281         def _check():
282             if pp.ended and pp.ended.value.status != 0:
283                 # the twistd process ends normally (with rc=0) if the child
284                 # is successfully launched. It ends abnormally (with rc!=0)
285                 # if the child cannot be launched.
286                 raise RuntimeError("process ended while waiting for startup")
287             return os.path.exists(furl_file)
288         d = self.poll(_check, 0.1)
289         # once it exists, wait a moment before we read from it, just in case
290         # it hasn't finished writing the whole thing. Ideally control.furl
291         # would be created in some atomic fashion, or made non-readable until
292         # it's ready, but I can't think of an easy way to do that, and I
293         # think the chances that we'll observe a half-write are pretty low.
294         def _stall(res):
295             d2 = defer.Deferred()
296             reactor.callLater(0.1, d2.callback, None)
297             return d2
298         d.addCallback(_stall)
299         def _read(res):
300             f = open(furl_file, "r")
301             furl = f.read()
302             return furl.strip()
303         d.addCallback(_read)
304         return d
305
306
307     def kill_client(self):
308         # returns a Deferred that fires when the process exits. This may only
309         # be called once.
310         try:
311             self.proc.signalProcess("INT")
312         except error.ProcessExitedAlready:
313             pass
314         return self.proc_done
315
316
317     def create_data(self, name, size):
318         filename = os.path.join(self.testdir, name + ".data")
319         f = open(filename, "wb")
320         block = "a" * 8192
321         while size > 0:
322             l = min(size, 8192)
323             f.write(block[:l])
324             size -= l
325         return filename
326
327     def stash_stats(self, stats, name):
328         self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
329         self.statsfile.flush()
330         self.stats[name] = stats['VmPeak']
331
332     def POST(self, urlpath, **fields):
333         url = self.webish_url + urlpath
334         sepbase = "boogabooga"
335         sep = "--" + sepbase
336         form = []
337         form.append(sep)
338         form.append('Content-Disposition: form-data; name="_charset"')
339         form.append('')
340         form.append('UTF-8')
341         form.append(sep)
342         for name, value in fields.iteritems():
343             if isinstance(value, tuple):
344                 filename, value = value
345                 form.append('Content-Disposition: form-data; name="%s"; '
346                             'filename="%s"' % (name, filename))
347             else:
348                 form.append('Content-Disposition: form-data; name="%s"' % name)
349             form.append('')
350             form.append(value)
351             form.append(sep)
352         form[-1] += "--"
353         body = "\r\n".join(form) + "\r\n"
354         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
355                    }
356         return tw_client.getPage(url, method="POST", postdata=body,
357                                  headers=headers, followRedirect=False)
358
359     def GET_discard(self, urlpath, stall):
360         url = self.webish_url + urlpath + "?filename=dummy-get.out"
361         return discardPage(url, stall)
362
363     def _print_usage(self, res=None):
364         d = self.control_rref.callRemote("get_memory_usage")
365         def _print(stats):
366             print "VmSize: %9d  VmPeak: %9d" % (stats["VmSize"],
367                                                 stats["VmPeak"])
368             return stats
369         d.addCallback(_print)
370         return d
371
372     def _do_upload(self, res, size, files, uris):
373         name = '%d' % size
374         print
375         print "uploading %s" % name
376         if self.mode in ("upload", "upload-self"):
377             files[name] = self.create_data(name, size)
378             d = self.control_rref.callRemote("upload_from_file_to_uri",
379                                              files[name])
380             def _done(uri):
381                 os.remove(files[name])
382                 del files[name]
383                 return uri
384             d.addCallback(_done)
385         elif self.mode == "upload-POST":
386             data = "a" * size
387             url = "/vdrive/global"
388             d = self.POST(url, t="upload", file=("%d.data" % size, data))
389         elif self.mode in ("receive",
390                            "download", "download-GET", "download-GET-slow"):
391             # mode=receive: upload the data from a local peer, so that the
392             # client-under-test receives and stores the shares
393             #
394             # mode=download*: upload the data from a local peer, then have
395             # the client-under-test download it.
396             #
397             # we need to wait until the uploading node has connected to all
398             # peers, since the wait_for_client_connections() above doesn't
399             # pay attention to our self.nodes[] and their connections.
400             files[name] = self.create_data(name, size)
401             u = self.nodes[0].getServiceNamed("uploader")
402             d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
403             d.addCallback(lambda res: u.upload_filename(files[name]))
404         else:
405             raise RuntimeError("unknown mode=%s" % self.mode)
406         def _complete(uri):
407             uris[name] = uri
408             print "uploaded %s" % name
409         d.addCallback(_complete)
410         return d
411
412     def _do_download(self, res, size, uris):
413         if self.mode not in ("download", "download-GET", "download-GET-slow"):
414             return
415         name = '%d' % size
416         print "downloading %s" % name
417         uri = uris[name]
418
419         if self.mode == "download":
420             d = self.control_rref.callRemote("download_from_uri_to_file",
421                                              uri, "dummy.out")
422         elif self.mode == "download-GET":
423             url = "/uri/%s" % uri
424             d = self.GET_discard(urllib.quote(url), stall=False)
425         elif self.mode == "download-GET-slow":
426             url = "/uri/%s" % uri
427             d = self.GET_discard(urllib.quote(url), stall=True)
428
429         def _complete(res):
430             print "downloaded %s" % name
431             return res
432         d.addCallback(_complete)
433         return d
434
435     def do_test(self):
436         #print "CLIENT STARTED"
437         #print "FURL", self.control_furl
438         #print "RREF", self.control_rref
439         #print
440         kB = 1000; MB = 1000*1000
441         files = {}
442         uris = {}
443
444         d = self._print_usage()
445         d.addCallback(self.stash_stats, "0B")
446
447         for i in range(10):
448             d.addCallback(self._do_upload, 10*kB+i, files, uris)
449             d.addCallback(self._do_download, 10*kB+i, uris)
450             d.addCallback(self._print_usage)
451         d.addCallback(self.stash_stats, "10kB")
452
453         for i in range(3):
454             d.addCallback(self._do_upload, 10*MB+i, files, uris)
455             d.addCallback(self._do_download, 10*MB+i, uris)
456             d.addCallback(self._print_usage)
457         d.addCallback(self.stash_stats, "10MB")
458
459         for i in range(1):
460             d.addCallback(self._do_upload, 50*MB+i, files, uris)
461             d.addCallback(self._do_download, 50*MB+i, uris)
462             d.addCallback(self._print_usage)
463         d.addCallback(self.stash_stats, "50MB")
464
465         #for i in range(1):
466         #    d.addCallback(self._do_upload, 100*MB+i, files, uris)
467         #    d.addCallback(self._do_download, 100*MB+i, uris)
468         #    d.addCallback(self._print_usage)
469         #d.addCallback(self.stash_stats, "100MB")
470
471         #d.addCallback(self.stall)
472         def _done(res):
473             print "FINISHING"
474         d.addCallback(_done)
475         return d
476
477     def stall(self, res):
478         d = defer.Deferred()
479         reactor.callLater(5, d.callback, None)
480         return d
481
482
483 class ClientWatcher(protocol.ProcessProtocol):
484     ended = False
485     def outReceived(self, data):
486         print "OUT:", data
487     def errReceived(self, data):
488         print "ERR:", data
489     def processEnded(self, reason):
490         self.ended = reason
491         self.d.callback(None)
492
493
494 if __name__ == '__main__':
495     mode = "upload"
496     if len(sys.argv) > 1:
497         mode = sys.argv[1]
498     # put the logfile and stats.out in _test_memory/ . These stick around.
499     # put the nodes and other files in _test_memory/test/ . These are
500     # removed each time we run.
501     sf = SystemFramework("_test_memory", mode)
502     sf.run()
503