]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/check_memory.py
8ee18c73f68f1013376a45f93413921baf22900e
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / check_memory.py
1 import os, shutil, sys, urllib, time, stat, urlparse
2 from cStringIO import StringIO
3 from twisted.internet import defer, reactor, protocol, error
4 from twisted.application import service, internet
5 from twisted.web import client as tw_client
6 from allmydata import client, introducer
7 from allmydata.immutable import upload
8 from allmydata.scripts import create_node
9 from allmydata.util import fileutil, pollmixin
10 from allmydata.util.fileutil import abspath_expanduser_unicode
11 from allmydata.util.encodingutil import get_filesystem_encoding
12 from foolscap.api import Tub, fireEventually, flushEventualQueue
13 from twisted.python import log
14
15 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
16     full_speed_ahead = False
17     _bytes_so_far = 0
18     stalled = None
19     def handleResponsePart(self, data):
20         self._bytes_so_far += len(data)
21         if not self.factory.do_stall:
22             return
23         if self.full_speed_ahead:
24             return
25         if self._bytes_so_far > 1e6+100:
26             if not self.stalled:
27                 print "STALLING"
28                 self.transport.pauseProducing()
29                 self.stalled = reactor.callLater(10.0, self._resume_speed)
30     def _resume_speed(self):
31         print "RESUME SPEED"
32         self.stalled = None
33         self.full_speed_ahead = True
34         self.transport.resumeProducing()
35     def handleResponseEnd(self):
36         if self.stalled:
37             print "CANCEL"
38             self.stalled.cancel()
39             self.stalled = None
40         return tw_client.HTTPPageGetter.handleResponseEnd(self)
41
42 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
43     protocol = StallableHTTPGetterDiscarder
44
45 def discardPage(url, stall=False, *args, **kwargs):
46     """Start fetching the URL, but stall our pipe after the first 1MB.
47     Wait 10 seconds, then resume downloading (and discarding) everything.
48     """
49     # adapted from twisted.web.client.getPage . We can't just wrap or
50     # subclass because it provides no way to override the HTTPClientFactory
51     # that it creates.
52     scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
53     assert scheme == 'http'
54     host, port = netloc, 80
55     if ":" in host:
56         host, port = host.split(":")
57         port = int(port)
58     factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
59     factory.do_stall = stall
60     reactor.connectTCP(host, port, factory)
61     return factory.deferred
62
63 class ChildDidNotStartError(Exception):
64     pass
65
66 class SystemFramework(pollmixin.PollMixin):
67     numnodes = 7
68
69     def __init__(self, basedir, mode):
70         self.basedir = basedir = abspath_expanduser_unicode(unicode(basedir))
71         if not (basedir + os.path.sep).startswith(abspath_expanduser_unicode(u".") + os.path.sep):
72             raise AssertionError("safety issue: basedir must be a subdir")
73         self.testdir = testdir = os.path.join(basedir, "test")
74         if os.path.exists(testdir):
75             shutil.rmtree(testdir)
76         fileutil.make_dirs(testdir)
77         self.sparent = service.MultiService()
78         self.sparent.startService()
79         self.proc = None
80         self.tub = Tub()
81         self.tub.setOption("expose-remote-exception-types", False)
82         self.tub.setServiceParent(self.sparent)
83         self.mode = mode
84         self.failed = False
85         self.keepalive_file = None
86
87     def run(self):
88         framelog = os.path.join(self.basedir, "driver.log")
89         log.startLogging(open(framelog, "a"), setStdout=False)
90         log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
91         #logfile = open(os.path.join(self.testdir, "log"), "w")
92         #flo = log.FileLogObserver(logfile)
93         #log.startLoggingWithObserver(flo.emit, setStdout=False)
94         d = fireEventually()
95         d.addCallback(lambda res: self.setUp())
96         d.addCallback(lambda res: self.record_initial_memusage())
97         d.addCallback(lambda res: self.make_nodes())
98         d.addCallback(lambda res: self.wait_for_client_connected())
99         d.addCallback(lambda res: self.do_test())
100         d.addBoth(self.tearDown)
101         def _err(err):
102             self.failed = err
103             log.err(err)
104             print err
105         d.addErrback(_err)
106         def _done(res):
107             reactor.stop()
108             return res
109         d.addBoth(_done)
110         reactor.run()
111         if self.failed:
112             # raiseException doesn't work for CopiedFailures
113             self.failed.raiseException()
114
115     def setUp(self):
116         #print "STARTING"
117         self.stats = {}
118         self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
119         d = self.make_introducer()
120         def _more(res):
121             return self.start_client()
122         d.addCallback(_more)
123         def _record_control_furl(control_furl):
124             self.control_furl = control_furl
125             #print "OBTAINING '%s'" % (control_furl,)
126             return self.tub.getReference(self.control_furl)
127         d.addCallback(_record_control_furl)
128         def _record_control(control_rref):
129             self.control_rref = control_rref
130         d.addCallback(_record_control)
131         def _ready(res):
132             #print "CLIENT READY"
133             pass
134         d.addCallback(_ready)
135         return d
136
137     def record_initial_memusage(self):
138         print
139         print "Client started (no connections yet)"
140         d = self._print_usage()
141         d.addCallback(self.stash_stats, "init")
142         return d
143
144     def wait_for_client_connected(self):
145         print
146         print "Client connecting to other nodes.."
147         return self.control_rref.callRemote("wait_for_client_connections",
148                                             self.numnodes+1)
149
150     def tearDown(self, passthrough):
151         # the client node will shut down in a few seconds
152         #os.remove(os.path.join(self.clientdir, client.Client.EXIT_TRIGGER_FILE))
153         log.msg("shutting down SystemTest services")
154         if self.keepalive_file and os.path.exists(self.keepalive_file):
155             age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
156             log.msg("keepalive file at shutdown was %ds old" % age)
157         d = defer.succeed(None)
158         if self.proc:
159             d.addCallback(lambda res: self.kill_client())
160         d.addCallback(lambda res: self.sparent.stopService())
161         d.addCallback(lambda res: flushEventualQueue())
162         def _close_statsfile(res):
163             self.statsfile.close()
164         d.addCallback(_close_statsfile)
165         d.addCallback(lambda res: passthrough)
166         return d
167
168     def add_service(self, s):
169         s.setServiceParent(self.sparent)
170         return s
171
172     def make_introducer(self):
173         iv_basedir = os.path.join(self.testdir, "introducer")
174         os.mkdir(iv_basedir)
175         iv = introducer.IntroducerNode(basedir=iv_basedir)
176         self.introducer = self.add_service(iv)
177         d = self.introducer.when_tub_ready()
178         def _introducer_ready(res):
179             q = self.introducer
180             self.introducer_furl = q.introducer_url
181         d.addCallback(_introducer_ready)
182         return d
183
184     def make_nodes(self):
185         self.nodes = []
186         for i in range(self.numnodes):
187             nodedir = os.path.join(self.testdir, "node%d" % i)
188             os.mkdir(nodedir)
189             f = open(os.path.join(nodedir, "tahoe.cfg"), "w")
190             f.write("[client]\n"
191                     "introducer.furl = %s\n"
192                     "shares.happy = 1\n"
193                     "[storage]\n"
194                     % (self.introducer_furl,))
195             # the only tests for which we want the internal nodes to actually
196             # retain shares are the ones where somebody's going to download
197             # them.
198             if self.mode in ("download", "download-GET", "download-GET-slow"):
199                 # retain shares
200                 pass
201             else:
202                 # for these tests, we tell the storage servers to pretend to
203                 # accept shares, but really just throw them out, since we're
204                 # only testing upload and not download.
205                 f.write("debug_discard = true\n")
206             if self.mode in ("receive",):
207                 # for this mode, the client-under-test gets all the shares,
208                 # so our internal nodes can refuse requests
209                 f.write("readonly = true\n")
210             f.close()
211             c = self.add_service(client.Client(basedir=nodedir))
212             self.nodes.append(c)
213         # the peers will start running, eventually they will connect to each
214         # other and the introducer
215
216     def touch_keepalive(self):
217         if os.path.exists(self.keepalive_file):
218             age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
219             log.msg("touching keepalive file, was %ds old" % age)
220         f = open(self.keepalive_file, "w")
221         f.write("""\
222 If the node notices this file at startup, it will poll every 5 seconds and
223 terminate if the file is more than 10 seconds old, or if it has been deleted.
224 If the test harness has an internal failure and neglects to kill off the node
225 itself, this helps to avoid leaving processes lying around. The contents of
226 this file are ignored.
227         """)
228         f.close()
229
230     def start_client(self):
231         # this returns a Deferred that fires with the client's control.furl
232         log.msg("MAKING CLIENT")
233         # self.testdir is an absolute Unicode path
234         clientdir = self.clientdir = os.path.join(self.testdir, u"client")
235         clientdir_str = clientdir.encode(get_filesystem_encoding())
236         quiet = StringIO()
237         create_node.create_node({'basedir': clientdir}, out=quiet)
238         log.msg("DONE MAKING CLIENT")
239         # now replace tahoe.cfg
240         # set webport=0 and then ask the node what port it picked.
241         f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
242         f.write("[node]\n"
243                 "web.port = tcp:0:interface=127.0.0.1\n"
244                 "[client]\n"
245                 "introducer.furl = %s\n"
246                 "shares.happy = 1\n"
247                 "[storage]\n"
248                 % (self.introducer_furl,))
249
250         if self.mode in ("upload-self", "receive"):
251             # accept and store shares, to trigger the memory consumption bugs
252             pass
253         else:
254             # don't accept any shares
255             f.write("readonly = true\n")
256             ## also, if we do receive any shares, throw them away
257             #f.write("debug_discard = true")
258         if self.mode == "upload-self":
259             pass
260         f.close()
261         self.keepalive_file = os.path.join(clientdir,
262                                            client.Client.EXIT_TRIGGER_FILE)
263         # now start updating the mtime.
264         self.touch_keepalive()
265         ts = internet.TimerService(1.0, self.touch_keepalive)
266         ts.setServiceParent(self.sparent)
267
268         pp = ClientWatcher()
269         self.proc_done = pp.d = defer.Deferred()
270         logfile = os.path.join(self.basedir, "client.log")
271         cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
272         env = os.environ.copy()
273         self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir_str)
274         log.msg("CLIENT STARTED")
275
276         # now we wait for the client to get started. we're looking for the
277         # control.furl file to appear.
278         furl_file = os.path.join(clientdir, "private", "control.furl")
279         url_file = os.path.join(clientdir, "node.url")
280         def _check():
281             if pp.ended and pp.ended.value.status != 0:
282                 # the twistd process ends normally (with rc=0) if the child
283                 # is successfully launched. It ends abnormally (with rc!=0)
284                 # if the child cannot be launched.
285                 raise ChildDidNotStartError("process ended while waiting for startup")
286             return os.path.exists(furl_file)
287         d = self.poll(_check, 0.1)
288         # once it exists, wait a moment before we read from it, just in case
289         # it hasn't finished writing the whole thing. Ideally control.furl
290         # would be created in some atomic fashion, or made non-readable until
291         # it's ready, but I can't think of an easy way to do that, and I
292         # think the chances that we'll observe a half-write are pretty low.
293         def _stall(res):
294             d2 = defer.Deferred()
295             reactor.callLater(0.1, d2.callback, None)
296             return d2
297         d.addCallback(_stall)
298         def _read(res):
299             # read the node's URL
300             self.webish_url = open(url_file, "r").read().strip()
301             if self.webish_url[-1] == "/":
302                 # trim trailing slash, since the rest of the code wants it gone
303                 self.webish_url = self.webish_url[:-1]
304             f = open(furl_file, "r")
305             furl = f.read()
306             return furl.strip()
307         d.addCallback(_read)
308         return d
309
310
311     def kill_client(self):
312         # returns a Deferred that fires when the process exits. This may only
313         # be called once.
314         try:
315             self.proc.signalProcess("INT")
316         except error.ProcessExitedAlready:
317             pass
318         return self.proc_done
319
320
321     def create_data(self, name, size):
322         filename = os.path.join(self.testdir, name + ".data")
323         f = open(filename, "wb")
324         block = "a" * 8192
325         while size > 0:
326             l = min(size, 8192)
327             f.write(block[:l])
328             size -= l
329         return filename
330
331     def stash_stats(self, stats, name):
332         self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
333         self.statsfile.flush()
334         self.stats[name] = stats['VmPeak']
335
336     def POST(self, urlpath, **fields):
337         url = self.webish_url + urlpath
338         sepbase = "boogabooga"
339         sep = "--" + sepbase
340         form = []
341         form.append(sep)
342         form.append('Content-Disposition: form-data; name="_charset"')
343         form.append('')
344         form.append('UTF-8')
345         form.append(sep)
346         for name, value in fields.iteritems():
347             if isinstance(value, tuple):
348                 filename, value = value
349                 form.append('Content-Disposition: form-data; name="%s"; '
350                             'filename="%s"' % (name, filename))
351             else:
352                 form.append('Content-Disposition: form-data; name="%s"' % name)
353             form.append('')
354             form.append(value)
355             form.append(sep)
356         form[-1] += "--"
357         body = "\r\n".join(form) + "\r\n"
358         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
359                    }
360         return tw_client.getPage(url, method="POST", postdata=body,
361                                  headers=headers, followRedirect=False)
362
363     def GET_discard(self, urlpath, stall):
364         url = self.webish_url + urlpath + "?filename=dummy-get.out"
365         return discardPage(url, stall)
366
367     def _print_usage(self, res=None):
368         d = self.control_rref.callRemote("get_memory_usage")
369         def _print(stats):
370             print "VmSize: %9d  VmPeak: %9d" % (stats["VmSize"],
371                                                 stats["VmPeak"])
372             return stats
373         d.addCallback(_print)
374         return d
375
376     def _do_upload(self, res, size, files, uris):
377         name = '%d' % size
378         print
379         print "uploading %s" % name
380         if self.mode in ("upload", "upload-self"):
381             files[name] = self.create_data(name, size)
382             d = self.control_rref.callRemote("upload_from_file_to_uri",
383                                              files[name].encode("utf-8"),
384                                              convergence="check-memory")
385             def _done(uri):
386                 os.remove(files[name])
387                 del files[name]
388                 return uri
389             d.addCallback(_done)
390         elif self.mode == "upload-POST":
391             data = "a" * size
392             url = "/uri"
393             d = self.POST(url, t="upload", file=("%d.data" % size, data))
394         elif self.mode in ("receive",
395                            "download", "download-GET", "download-GET-slow"):
396             # mode=receive: upload the data from a local peer, so that the
397             # client-under-test receives and stores the shares
398             #
399             # mode=download*: upload the data from a local peer, then have
400             # the client-under-test download it.
401             #
402             # we need to wait until the uploading node has connected to all
403             # peers, since the wait_for_client_connections() above doesn't
404             # pay attention to our self.nodes[] and their connections.
405             files[name] = self.create_data(name, size)
406             u = self.nodes[0].getServiceNamed("uploader")
407             d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
408             d.addCallback(lambda res:
409                           u.upload(upload.FileName(files[name],
410                                                    convergence="check-memory")))
411             d.addCallback(lambda results: results.get_uri())
412         else:
413             raise ValueError("unknown mode=%s" % self.mode)
414         def _complete(uri):
415             uris[name] = uri
416             print "uploaded %s" % name
417         d.addCallback(_complete)
418         return d
419
420     def _do_download(self, res, size, uris):
421         if self.mode not in ("download", "download-GET", "download-GET-slow"):
422             return
423         name = '%d' % size
424         print "downloading %s" % name
425         uri = uris[name]
426
427         if self.mode == "download":
428             d = self.control_rref.callRemote("download_from_uri_to_file",
429                                              uri, "dummy.out")
430         elif self.mode == "download-GET":
431             url = "/uri/%s" % uri
432             d = self.GET_discard(urllib.quote(url), stall=False)
433         elif self.mode == "download-GET-slow":
434             url = "/uri/%s" % uri
435             d = self.GET_discard(urllib.quote(url), stall=True)
436
437         def _complete(res):
438             print "downloaded %s" % name
439             return res
440         d.addCallback(_complete)
441         return d
442
443     def do_test(self):
444         #print "CLIENT STARTED"
445         #print "FURL", self.control_furl
446         #print "RREF", self.control_rref
447         #print
448         kB = 1000; MB = 1000*1000
449         files = {}
450         uris = {}
451
452         d = self._print_usage()
453         d.addCallback(self.stash_stats, "0B")
454
455         for i in range(10):
456             d.addCallback(self._do_upload, 10*kB+i, files, uris)
457             d.addCallback(self._do_download, 10*kB+i, uris)
458             d.addCallback(self._print_usage)
459         d.addCallback(self.stash_stats, "10kB")
460
461         for i in range(3):
462             d.addCallback(self._do_upload, 10*MB+i, files, uris)
463             d.addCallback(self._do_download, 10*MB+i, uris)
464             d.addCallback(self._print_usage)
465         d.addCallback(self.stash_stats, "10MB")
466
467         for i in range(1):
468             d.addCallback(self._do_upload, 50*MB+i, files, uris)
469             d.addCallback(self._do_download, 50*MB+i, uris)
470             d.addCallback(self._print_usage)
471         d.addCallback(self.stash_stats, "50MB")
472
473         #for i in range(1):
474         #    d.addCallback(self._do_upload, 100*MB+i, files, uris)
475         #    d.addCallback(self._do_download, 100*MB+i, uris)
476         #    d.addCallback(self._print_usage)
477         #d.addCallback(self.stash_stats, "100MB")
478
479         #d.addCallback(self.stall)
480         def _done(res):
481             print "FINISHING"
482         d.addCallback(_done)
483         return d
484
485     def stall(self, res):
486         d = defer.Deferred()
487         reactor.callLater(5, d.callback, None)
488         return d
489
490
491 class ClientWatcher(protocol.ProcessProtocol):
492     ended = False
493     def outReceived(self, data):
494         print "OUT:", data
495     def errReceived(self, data):
496         print "ERR:", data
497     def processEnded(self, reason):
498         self.ended = reason
499         self.d.callback(None)
500
501
502 if __name__ == '__main__':
503     mode = "upload"
504     if len(sys.argv) > 1:
505         mode = sys.argv[1]
506     if sys.maxint == 2147483647:
507         bits = "32"
508     elif sys.maxint == 9223372036854775807:
509         bits = "64"
510     else:
511         bits = "?"
512     print "%s-bit system (sys.maxint=%d)" % (bits, sys.maxint)
513     # put the logfile and stats.out in _test_memory/ . These stick around.
514     # put the nodes and other files in _test_memory/test/ . These are
515     # removed each time we run.
516     sf = SystemFramework("_test_memory", mode)
517     sf.run()
518