]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/check_memory.py
Add create-node CLI command, and make create-client equivalent to create-node --no...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / check_memory.py
1 #! /usr/bin/env python
2
3 import os, shutil, sys, urllib, time, stat
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web import client as tw_client
8 from allmydata import client, introducer
9 from allmydata.immutable import upload
10 from allmydata.scripts import create_node
11 from allmydata.util import fileutil, pollmixin
12 from foolscap.api import Tub, fireEventually, flushEventualQueue
13 from twisted.python import log
14
15 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
16     full_speed_ahead = False
17     _bytes_so_far = 0
18     stalled = None
19     def handleResponsePart(self, data):
20         self._bytes_so_far += len(data)
21         if not self.factory.do_stall:
22             return
23         if self.full_speed_ahead:
24             return
25         if self._bytes_so_far > 1e6+100:
26             if not self.stalled:
27                 print "STALLING"
28                 self.transport.pauseProducing()
29                 self.stalled = reactor.callLater(10.0, self._resume_speed)
30     def _resume_speed(self):
31         print "RESUME SPEED"
32         self.stalled = None
33         self.full_speed_ahead = True
34         self.transport.resumeProducing()
35     def handleResponseEnd(self):
36         if self.stalled:
37             print "CANCEL"
38             self.stalled.cancel()
39             self.stalled = None
40         return tw_client.HTTPPageGetter.handleResponseEnd(self)
41
42 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
43     protocol = StallableHTTPGetterDiscarder
44
45 def discardPage(url, stall=False, *args, **kwargs):
46     """Start fetching the URL, but stall our pipe after the first 1MB.
47     Wait 10 seconds, then resume downloading (and discarding) everything.
48     """
49     # adapted from twisted.web.client.getPage . We can't just wrap or
50     # subclass because it provides no way to override the HTTPClientFactory
51     # that it creates.
52     scheme, host, port, path = tw_client._parse(url)
53     factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
54     factory.do_stall = stall
55     assert scheme == 'http'
56     reactor.connectTCP(host, port, factory)
57     return factory.deferred
58
59 class ChildDidNotStartError(Exception):
60     pass
61
62 class SystemFramework(pollmixin.PollMixin):
63     numnodes = 5
64
65     def __init__(self, basedir, mode):
66         self.basedir = basedir = os.path.abspath(basedir)
67         if not basedir.startswith(os.path.abspath(".")):
68             raise AssertionError("safety issue: basedir must be a subdir")
69         self.testdir = testdir = os.path.join(basedir, "test")
70         if os.path.exists(testdir):
71             shutil.rmtree(testdir)
72         fileutil.make_dirs(testdir)
73         self.sparent = service.MultiService()
74         self.sparent.startService()
75         self.proc = None
76         self.tub = Tub()
77         self.tub.setOption("expose-remote-exception-types", False)
78         self.tub.setServiceParent(self.sparent)
79         self.mode = mode
80         self.failed = False
81         self.keepalive_file = None
82
83     def run(self):
84         framelog = os.path.join(self.basedir, "driver.log")
85         log.startLogging(open(framelog, "a"), setStdout=False)
86         log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
87         #logfile = open(os.path.join(self.testdir, "log"), "w")
88         #flo = log.FileLogObserver(logfile)
89         #log.startLoggingWithObserver(flo.emit, setStdout=False)
90         d = fireEventually()
91         d.addCallback(lambda res: self.setUp())
92         d.addCallback(lambda res: self.record_initial_memusage())
93         d.addCallback(lambda res: self.make_nodes())
94         d.addCallback(lambda res: self.wait_for_client_connected())
95         d.addCallback(lambda res: self.do_test())
96         d.addBoth(self.tearDown)
97         def _err(err):
98             self.failed = err
99             log.err(err)
100             print err
101         d.addErrback(_err)
102         def _done(res):
103             reactor.stop()
104             return res
105         d.addBoth(_done)
106         reactor.run()
107         if self.failed:
108             # raiseException doesn't work for CopiedFailures
109             self.failed.raiseException()
110
111     def setUp(self):
112         #print "STARTING"
113         self.stats = {}
114         self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
115         d = self.make_introducer()
116         def _more(res):
117             return self.start_client()
118         d.addCallback(_more)
119         def _record_control_furl(control_furl):
120             self.control_furl = control_furl
121             #print "OBTAINING '%s'" % (control_furl,)
122             return self.tub.getReference(self.control_furl)
123         d.addCallback(_record_control_furl)
124         def _record_control(control_rref):
125             self.control_rref = control_rref
126         d.addCallback(_record_control)
127         def _ready(res):
128             #print "CLIENT READY"
129             pass
130         d.addCallback(_ready)
131         return d
132
133     def record_initial_memusage(self):
134         print
135         print "Client started (no connections yet)"
136         d = self._print_usage()
137         d.addCallback(self.stash_stats, "init")
138         return d
139
140     def wait_for_client_connected(self):
141         print
142         print "Client connecting to other nodes.."
143         return self.control_rref.callRemote("wait_for_client_connections",
144                                             self.numnodes+1)
145
146     def tearDown(self, passthrough):
147         # the client node will shut down in a few seconds
148         #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
149         log.msg("shutting down SystemTest services")
150         if self.keepalive_file and os.path.exists(self.keepalive_file):
151             age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
152             log.msg("keepalive file at shutdown was %ds old" % age)
153         d = defer.succeed(None)
154         if self.proc:
155             d.addCallback(lambda res: self.kill_client())
156         d.addCallback(lambda res: self.sparent.stopService())
157         d.addCallback(lambda res: flushEventualQueue())
158         def _close_statsfile(res):
159             self.statsfile.close()
160         d.addCallback(_close_statsfile)
161         d.addCallback(lambda res: passthrough)
162         return d
163
164     def add_service(self, s):
165         s.setServiceParent(self.sparent)
166         return s
167
168     def make_introducer(self):
169         iv_basedir = os.path.join(self.testdir, "introducer")
170         os.mkdir(iv_basedir)
171         iv = introducer.IntroducerNode(basedir=iv_basedir)
172         self.introducer = self.add_service(iv)
173         d = self.introducer.when_tub_ready()
174         def _introducer_ready(res):
175             q = self.introducer
176             self.introducer_furl = q.introducer_url
177         d.addCallback(_introducer_ready)
178         return d
179
180     def make_nodes(self):
181         self.nodes = []
182         for i in range(self.numnodes):
183             nodedir = os.path.join(self.testdir, "node%d" % i)
184             os.mkdir(nodedir)
185             f = open(os.path.join(nodedir, "introducer.furl"), "w")
186             f.write(self.introducer_furl)
187             f.close()
188             # the only tests for which we want the internal nodes to actually
189             # retain shares are the ones where somebody's going to download
190             # them.
191             if self.mode in ("download", "download-GET", "download-GET-slow"):
192                 # retain shares
193                 pass
194             else:
195                 # for these tests, we tell the storage servers to pretend to
196                 # accept shares, but really just throw them out, since we're
197                 # only testing upload and not download.
198                 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
199                 f.write("no_storage\n")
200                 f.close()
201             if self.mode in ("receive",):
202                 # for this mode, the client-under-test gets all the shares,
203                 # so our internal nodes can refuse requests
204                 f = open(os.path.join(nodedir, "readonly_storage"), "w")
205                 f.write("\n")
206                 f.close()
207             c = self.add_service(client.Client(basedir=nodedir))
208             self.nodes.append(c)
209         # the peers will start running, eventually they will connect to each
210         # other and the introducer
211
212     def touch_keepalive(self):
213         if os.path.exists(self.keepalive_file):
214             age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
215             log.msg("touching keepalive file, was %ds old" % age)
216         f = open(self.keepalive_file, "w")
217         f.write("""\
218 If the node notices this file at startup, it will poll every 5 seconds and
219 terminate if the file is more than 10 seconds old, or if it has been deleted.
220 If the test harness has an internal failure and neglects to kill off the node
221 itself, this helps to avoid leaving processes lying around. The contents of
222 this file are ignored.
223         """)
224         f.close()
225
226     def start_client(self):
227         # this returns a Deferred that fires with the client's control.furl
228         log.msg("MAKING CLIENT")
229         clientdir = self.clientdir = os.path.join(self.testdir, "client")
230         quiet = StringIO()
231         create_node.create_node(clientdir, {}, out=quiet)
232         log.msg("DONE MAKING CLIENT")
233         f = open(os.path.join(clientdir, "introducer.furl"), "w")
234         f.write(self.introducer_furl + "\n")
235         f.close()
236
237         # set webport=0 and then ask the node what port it picked.
238         f = open(os.path.join(clientdir, "webport"), "w")
239         f.write("tcp:0:interface=127.0.0.1\n")
240         f.close()
241
242         if self.mode in ("upload-self", "receive"):
243             # accept and store shares, to trigger the memory consumption bugs
244             pass
245         else:
246             # don't accept any shares
247             f = open(os.path.join(clientdir, "readonly_storage"), "w")
248             f.write("true\n")
249             f.close()
250             ## also, if we do receive any shares, throw them away
251             #f = open(os.path.join(clientdir, "debug_no_storage"), "w")
252             #f.write("no_storage\n")
253             #f.close()
254         if self.mode == "upload-self":
255             pass
256         self.keepalive_file = os.path.join(clientdir,
257                                            "suicide_prevention_hotline")
258         # now start updating the mtime.
259         self.touch_keepalive()
260         ts = internet.TimerService(1.0, self.touch_keepalive)
261         ts.setServiceParent(self.sparent)
262
263         pp = ClientWatcher()
264         self.proc_done = pp.d = defer.Deferred()
265         logfile = os.path.join(self.basedir, "client.log")
266         cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
267         env = os.environ.copy()
268         self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
269         log.msg("CLIENT STARTED")
270
271         # now we wait for the client to get started. we're looking for the
272         # control.furl file to appear.
273         furl_file = os.path.join(clientdir, "private", "control.furl")
274         url_file = os.path.join(clientdir, "node.url")
275         def _check():
276             if pp.ended and pp.ended.value.status != 0:
277                 # the twistd process ends normally (with rc=0) if the child
278                 # is successfully launched. It ends abnormally (with rc!=0)
279                 # if the child cannot be launched.
280                 raise ChildDidNotStartError("process ended while waiting for startup")
281             return os.path.exists(furl_file)
282         d = self.poll(_check, 0.1)
283         # once it exists, wait a moment before we read from it, just in case
284         # it hasn't finished writing the whole thing. Ideally control.furl
285         # would be created in some atomic fashion, or made non-readable until
286         # it's ready, but I can't think of an easy way to do that, and I
287         # think the chances that we'll observe a half-write are pretty low.
288         def _stall(res):
289             d2 = defer.Deferred()
290             reactor.callLater(0.1, d2.callback, None)
291             return d2
292         d.addCallback(_stall)
293         def _read(res):
294             # read the node's URL
295             self.webish_url = open(url_file, "r").read().strip()
296             if self.webish_url[-1] == "/":
297                 # trim trailing slash, since the rest of the code wants it gone
298                 self.webish_url = self.webish_url[:-1]
299             f = open(furl_file, "r")
300             furl = f.read()
301             return furl.strip()
302         d.addCallback(_read)
303         return d
304
305
306     def kill_client(self):
307         # returns a Deferred that fires when the process exits. This may only
308         # be called once.
309         try:
310             self.proc.signalProcess("INT")
311         except error.ProcessExitedAlready:
312             pass
313         return self.proc_done
314
315
316     def create_data(self, name, size):
317         filename = os.path.join(self.testdir, name + ".data")
318         f = open(filename, "wb")
319         block = "a" * 8192
320         while size > 0:
321             l = min(size, 8192)
322             f.write(block[:l])
323             size -= l
324         return filename
325
326     def stash_stats(self, stats, name):
327         self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
328         self.statsfile.flush()
329         self.stats[name] = stats['VmPeak']
330
331     def POST(self, urlpath, **fields):
332         url = self.webish_url + urlpath
333         sepbase = "boogabooga"
334         sep = "--" + sepbase
335         form = []
336         form.append(sep)
337         form.append('Content-Disposition: form-data; name="_charset"')
338         form.append('')
339         form.append('UTF-8')
340         form.append(sep)
341         for name, value in fields.iteritems():
342             if isinstance(value, tuple):
343                 filename, value = value
344                 form.append('Content-Disposition: form-data; name="%s"; '
345                             'filename="%s"' % (name, filename))
346             else:
347                 form.append('Content-Disposition: form-data; name="%s"' % name)
348             form.append('')
349             form.append(value)
350             form.append(sep)
351         form[-1] += "--"
352         body = "\r\n".join(form) + "\r\n"
353         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
354                    }
355         return tw_client.getPage(url, method="POST", postdata=body,
356                                  headers=headers, followRedirect=False)
357
358     def GET_discard(self, urlpath, stall):
359         url = self.webish_url + urlpath + "?filename=dummy-get.out"
360         return discardPage(url, stall)
361
362     def _print_usage(self, res=None):
363         d = self.control_rref.callRemote("get_memory_usage")
364         def _print(stats):
365             print "VmSize: %9d  VmPeak: %9d" % (stats["VmSize"],
366                                                 stats["VmPeak"])
367             return stats
368         d.addCallback(_print)
369         return d
370
371     def _do_upload(self, res, size, files, uris):
372         name = '%d' % size
373         print
374         print "uploading %s" % name
375         if self.mode in ("upload", "upload-self"):
376             files[name] = self.create_data(name, size)
377             d = self.control_rref.callRemote("upload_from_file_to_uri",
378                                              files[name], convergence="check-memory convergence string")
379             def _done(uri):
380                 os.remove(files[name])
381                 del files[name]
382                 return uri
383             d.addCallback(_done)
384         elif self.mode == "upload-POST":
385             data = "a" * size
386             url = "/uri"
387             d = self.POST(url, t="upload", file=("%d.data" % size, data))
388         elif self.mode in ("receive",
389                            "download", "download-GET", "download-GET-slow"):
390             # mode=receive: upload the data from a local peer, so that the
391             # client-under-test receives and stores the shares
392             #
393             # mode=download*: upload the data from a local peer, then have
394             # the client-under-test download it.
395             #
396             # we need to wait until the uploading node has connected to all
397             # peers, since the wait_for_client_connections() above doesn't
398             # pay attention to our self.nodes[] and their connections.
399             files[name] = self.create_data(name, size)
400             u = self.nodes[0].getServiceNamed("uploader")
401             d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
402             d.addCallback(lambda res: u.upload(upload.FileName(files[name], convergence="check-memory convergence string")))
403             d.addCallback(lambda results: results.uri)
404         else:
405             raise ValueError("unknown mode=%s" % self.mode)
406         def _complete(uri):
407             uris[name] = uri
408             print "uploaded %s" % name
409         d.addCallback(_complete)
410         return d
411
412     def _do_download(self, res, size, uris):
413         if self.mode not in ("download", "download-GET", "download-GET-slow"):
414             return
415         name = '%d' % size
416         print "downloading %s" % name
417         uri = uris[name]
418
419         if self.mode == "download":
420             d = self.control_rref.callRemote("download_from_uri_to_file",
421                                              uri, "dummy.out")
422         elif self.mode == "download-GET":
423             url = "/uri/%s" % uri
424             d = self.GET_discard(urllib.quote(url), stall=False)
425         elif self.mode == "download-GET-slow":
426             url = "/uri/%s" % uri
427             d = self.GET_discard(urllib.quote(url), stall=True)
428
429         def _complete(res):
430             print "downloaded %s" % name
431             return res
432         d.addCallback(_complete)
433         return d
434
435     def do_test(self):
436         #print "CLIENT STARTED"
437         #print "FURL", self.control_furl
438         #print "RREF", self.control_rref
439         #print
440         kB = 1000; MB = 1000*1000
441         files = {}
442         uris = {}
443
444         d = self._print_usage()
445         d.addCallback(self.stash_stats, "0B")
446
447         for i in range(10):
448             d.addCallback(self._do_upload, 10*kB+i, files, uris)
449             d.addCallback(self._do_download, 10*kB+i, uris)
450             d.addCallback(self._print_usage)
451         d.addCallback(self.stash_stats, "10kB")
452
453         for i in range(3):
454             d.addCallback(self._do_upload, 10*MB+i, files, uris)
455             d.addCallback(self._do_download, 10*MB+i, uris)
456             d.addCallback(self._print_usage)
457         d.addCallback(self.stash_stats, "10MB")
458
459         for i in range(1):
460             d.addCallback(self._do_upload, 50*MB+i, files, uris)
461             d.addCallback(self._do_download, 50*MB+i, uris)
462             d.addCallback(self._print_usage)
463         d.addCallback(self.stash_stats, "50MB")
464
465         #for i in range(1):
466         #    d.addCallback(self._do_upload, 100*MB+i, files, uris)
467         #    d.addCallback(self._do_download, 100*MB+i, uris)
468         #    d.addCallback(self._print_usage)
469         #d.addCallback(self.stash_stats, "100MB")
470
471         #d.addCallback(self.stall)
472         def _done(res):
473             print "FINISHING"
474         d.addCallback(_done)
475         return d
476
477     def stall(self, res):
478         d = defer.Deferred()
479         reactor.callLater(5, d.callback, None)
480         return d
481
482
483 class ClientWatcher(protocol.ProcessProtocol):
484     ended = False
485     def outReceived(self, data):
486         print "OUT:", data
487     def errReceived(self, data):
488         print "ERR:", data
489     def processEnded(self, reason):
490         self.ended = reason
491         self.d.callback(None)
492
493
494 if __name__ == '__main__':
495     mode = "upload"
496     if len(sys.argv) > 1:
497         mode = sys.argv[1]
498     # put the logfile and stats.out in _test_memory/ . These stick around.
499     # put the nodes and other files in _test_memory/test/ . These are
500     # removed each time we run.
501     sf = SystemFramework("_test_memory", mode)
502     sf.run()
503