]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/check_memory.py
upload: oops, fix breakage after removing upload_file/upload_data/etc
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / check_memory.py
1 #! /usr/bin/env python
2
3 import os, shutil, sys, urllib, time, stat
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web import client as tw_client
8 from allmydata import client, introducer, upload
9 from allmydata.scripts import create_node
10 from allmydata.util import testutil, fileutil
11 import foolscap
12 from foolscap import eventual
13 from twisted.python import log
14
15 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
16     full_speed_ahead = False
17     _bytes_so_far = 0
18     stalled = None
19     def handleResponsePart(self, data):
20         self._bytes_so_far += len(data)
21         if not self.factory.do_stall:
22             return
23         if self.full_speed_ahead:
24             return
25         if self._bytes_so_far > 1e6+100:
26             if not self.stalled:
27                 print "STALLING"
28                 self.transport.pauseProducing()
29                 self.stalled = reactor.callLater(10.0, self._resume_speed)
30     def _resume_speed(self):
31         print "RESUME SPEED"
32         self.stalled = None
33         self.full_speed_ahead = True
34         self.transport.resumeProducing()
35     def handleResponseEnd(self):
36         if self.stalled:
37             print "CANCEL"
38             self.stalled.cancel()
39             self.stalled = None
40         return tw_client.HTTPPageGetter.handleResponseEnd(self)
41
42 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
43     protocol = StallableHTTPGetterDiscarder
44
45 def discardPage(url, stall=False, *args, **kwargs):
46     """Start fetching the URL, but stall our pipe after the first 1MB.
47     Wait 10 seconds, then resume downloading (and discarding) everything.
48     """
49     # adapted from twisted.web.client.getPage . We can't just wrap or
50     # subclass because it provides no way to override the HTTPClientFactory
51     # that it creates.
52     scheme, host, port, path = tw_client._parse(url)
53     factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
54     factory.do_stall = stall
55     assert scheme == 'http'
56     reactor.connectTCP(host, port, factory)
57     return factory.deferred
58
59 class SystemFramework(testutil.PollMixin):
60     numnodes = 5
61
62     def __init__(self, basedir, mode):
63         self.basedir = basedir = os.path.abspath(basedir)
64         if not basedir.startswith(os.path.abspath(".")):
65             raise AssertionError("safety issue: basedir must be a subdir")
66         self.testdir = testdir = os.path.join(basedir, "test")
67         if os.path.exists(testdir):
68             shutil.rmtree(testdir)
69         fileutil.make_dirs(testdir)
70         self.sparent = service.MultiService()
71         self.sparent.startService()
72         self.proc = None
73         self.tub = foolscap.Tub()
74         self.tub.setServiceParent(self.sparent)
75         self.mode = mode
76         self.failed = False
77         self.keepalive_file = None
78
79     def run(self):
80         framelog = os.path.join(self.basedir, "driver.log")
81         log.startLogging(open(framelog, "a"), setStdout=False)
82         log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
83         #logfile = open(os.path.join(self.testdir, "log"), "w")
84         #flo = log.FileLogObserver(logfile)
85         #log.startLoggingWithObserver(flo.emit, setStdout=False)
86         d = eventual.fireEventually()
87         d.addCallback(lambda res: self.setUp())
88         d.addCallback(lambda res: self.record_initial_memusage())
89         d.addCallback(lambda res: self.make_nodes())
90         d.addCallback(lambda res: self.wait_for_client_connected())
91         d.addCallback(lambda res: self.do_test())
92         d.addBoth(self.tearDown)
93         def _err(err):
94             self.failed = err
95             log.err(err)
96             print err
97         d.addErrback(_err)
98         def _done(res):
99             reactor.stop()
100             return res
101         d.addBoth(_done)
102         reactor.run()
103         if self.failed:
104             # raiseException doesn't work for CopiedFailures
105             self.failed.raiseException()
106
107     def setUp(self):
108         #print "STARTING"
109         self.stats = {}
110         self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
111         d = self.make_introducer()
112         def _more(res):
113             return self.start_client()
114         d.addCallback(_more)
115         def _record_control_furl(control_furl):
116             self.control_furl = control_furl
117             #print "OBTAINING '%s'" % (control_furl,)
118             return self.tub.getReference(self.control_furl)
119         d.addCallback(_record_control_furl)
120         def _record_control(control_rref):
121             self.control_rref = control_rref
122         d.addCallback(_record_control)
123         def _ready(res):
124             #print "CLIENT READY"
125             pass
126         d.addCallback(_ready)
127         return d
128
129     def record_initial_memusage(self):
130         print
131         print "Client started (no connections yet)"
132         d = self._print_usage()
133         d.addCallback(self.stash_stats, "init")
134         return d
135
136     def wait_for_client_connected(self):
137         print
138         print "Client connecting to other nodes.."
139         return self.control_rref.callRemote("wait_for_client_connections",
140                                             self.numnodes+1)
141
142     def tearDown(self, passthrough):
143         # the client node will shut down in a few seconds
144         #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
145         log.msg("shutting down SystemTest services")
146         if self.keepalive_file and os.path.exists(self.keepalive_file):
147             age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
148             log.msg("keepalive file at shutdown was %ds old" % age)
149         d = defer.succeed(None)
150         if self.proc:
151             d.addCallback(lambda res: self.kill_client())
152         d.addCallback(lambda res: self.sparent.stopService())
153         d.addCallback(lambda res: eventual.flushEventualQueue())
154         def _close_statsfile(res):
155             self.statsfile.close()
156         d.addCallback(_close_statsfile)
157         d.addCallback(lambda res: passthrough)
158         return d
159
160     def add_service(self, s):
161         s.setServiceParent(self.sparent)
162         return s
163
164     def make_introducer(self):
165         iv_basedir = os.path.join(self.testdir, "introducer")
166         os.mkdir(iv_basedir)
167         iv = introducer.IntroducerNode(basedir=iv_basedir)
168         self.introducer = self.add_service(iv)
169         d = self.introducer.when_tub_ready()
170         def _introducer_ready(res):
171             q = self.introducer
172             self.introducer_furl = q.introducer_url
173         d.addCallback(_introducer_ready)
174         return d
175
176     def make_nodes(self):
177         self.nodes = []
178         for i in range(self.numnodes):
179             nodedir = os.path.join(self.testdir, "node%d" % i)
180             os.mkdir(nodedir)
181             f = open(os.path.join(nodedir, "introducer.furl"), "w")
182             f.write(self.introducer_furl)
183             f.close()
184             # the only tests for which we want the internal nodes to actually
185             # retain shares are the ones where somebody's going to download
186             # them.
187             if self.mode in ("download", "download-GET", "download-GET-slow"):
188                 # retain shares
189                 pass
190             else:
191                 # for these tests, we tell the storage servers to pretend to
192                 # accept shares, but really just throw them out, since we're
193                 # only testing upload and not download.
194                 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
195                 f.write("no_storage\n")
196                 f.close()
197             if self.mode in ("receive",):
198                 # for this mode, the client-under-test gets all the shares,
199                 # so our internal nodes can refuse requests
200                 f = open(os.path.join(nodedir, "sizelimit"), "w")
201                 f.write("0\n")
202                 f.close()
203             c = self.add_service(client.Client(basedir=nodedir))
204             self.nodes.append(c)
205         # the peers will start running, eventually they will connect to each
206         # other and the introducer
207
208     def touch_keepalive(self):
209         if os.path.exists(self.keepalive_file):
210             age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
211             log.msg("touching keepalive file, was %ds old" % age)
212         f = open(self.keepalive_file, "w")
213         f.write("""\
214 If the node notices this file at startup, it will poll every 5 seconds and
215 terminate if the file is more than 10 seconds old, or if it has been deleted.
216 If the test harness has an internal failure and neglects to kill off the node
217 itself, this helps to avoid leaving processes lying around. The contents of
218 this file are ignored.
219         """)
220         f.close()
221
222     def start_client(self):
223         # this returns a Deferred that fires with the client's control.furl
224         log.msg("MAKING CLIENT")
225         clientdir = self.clientdir = os.path.join(self.testdir, "client")
226         quiet = StringIO()
227         create_node.create_client(clientdir, {}, out=quiet)
228         log.msg("DONE MAKING CLIENT")
229         f = open(os.path.join(clientdir, "introducer.furl"), "w")
230         f.write(self.introducer_furl + "\n")
231         f.close()
232         f = open(os.path.join(clientdir, "webport"), "w")
233         # TODO: ideally we would set webport=0 and then ask the node what
234         # port it picked. But at the moment it is not convenient to do this,
235         # so we just pick a relatively unique one.
236         webport = max(os.getpid(), 2000)
237         f.write("tcp:%d:interface=127.0.0.1\n" % webport)
238         f.close()
239         self.webish_url = "http://localhost:%d" % webport
240         if self.mode in ("upload-self", "receive"):
241             # accept and store shares, to trigger the memory consumption bugs
242             pass
243         else:
244             # don't accept any shares
245             f = open(os.path.join(clientdir, "sizelimit"), "w")
246             f.write("0\n")
247             f.close()
248             ## also, if we do receive any shares, throw them away
249             #f = open(os.path.join(clientdir, "debug_no_storage"), "w")
250             #f.write("no_storage\n")
251             #f.close()
252         if self.mode == "upload-self":
253             f = open(os.path.join(clientdir, "push_to_ourselves"), "w")
254             f.write("push_to_ourselves\n")
255             f.close()
256         self.keepalive_file = os.path.join(clientdir,
257                                            "suicide_prevention_hotline")
258         # now start updating the mtime.
259         self.touch_keepalive()
260         ts = internet.TimerService(1.0, self.touch_keepalive)
261         ts.setServiceParent(self.sparent)
262
263         pp = ClientWatcher()
264         self.proc_done = pp.d = defer.Deferred()
265         logfile = os.path.join(self.basedir, "client.log")
266         cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
267         env = os.environ.copy()
268         self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
269         log.msg("CLIENT STARTED")
270
271         # now we wait for the client to get started. we're looking for the
272         # control.furl file to appear.
273         furl_file = os.path.join(clientdir, "private", "control.furl")
274         def _check():
275             if pp.ended and pp.ended.value.status != 0:
276                 # the twistd process ends normally (with rc=0) if the child
277                 # is successfully launched. It ends abnormally (with rc!=0)
278                 # if the child cannot be launched.
279                 raise RuntimeError("process ended while waiting for startup")
280             return os.path.exists(furl_file)
281         d = self.poll(_check, 0.1)
282         # once it exists, wait a moment before we read from it, just in case
283         # it hasn't finished writing the whole thing. Ideally control.furl
284         # would be created in some atomic fashion, or made non-readable until
285         # it's ready, but I can't think of an easy way to do that, and I
286         # think the chances that we'll observe a half-write are pretty low.
287         def _stall(res):
288             d2 = defer.Deferred()
289             reactor.callLater(0.1, d2.callback, None)
290             return d2
291         d.addCallback(_stall)
292         def _read(res):
293             f = open(furl_file, "r")
294             furl = f.read()
295             return furl.strip()
296         d.addCallback(_read)
297         return d
298
299
300     def kill_client(self):
301         # returns a Deferred that fires when the process exits. This may only
302         # be called once.
303         try:
304             self.proc.signalProcess("INT")
305         except error.ProcessExitedAlready:
306             pass
307         return self.proc_done
308
309
310     def create_data(self, name, size):
311         filename = os.path.join(self.testdir, name + ".data")
312         f = open(filename, "wb")
313         block = "a" * 8192
314         while size > 0:
315             l = min(size, 8192)
316             f.write(block[:l])
317             size -= l
318         return filename
319
320     def stash_stats(self, stats, name):
321         self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
322         self.statsfile.flush()
323         self.stats[name] = stats['VmPeak']
324
325     def POST(self, urlpath, **fields):
326         url = self.webish_url + urlpath
327         sepbase = "boogabooga"
328         sep = "--" + sepbase
329         form = []
330         form.append(sep)
331         form.append('Content-Disposition: form-data; name="_charset"')
332         form.append('')
333         form.append('UTF-8')
334         form.append(sep)
335         for name, value in fields.iteritems():
336             if isinstance(value, tuple):
337                 filename, value = value
338                 form.append('Content-Disposition: form-data; name="%s"; '
339                             'filename="%s"' % (name, filename))
340             else:
341                 form.append('Content-Disposition: form-data; name="%s"' % name)
342             form.append('')
343             form.append(value)
344             form.append(sep)
345         form[-1] += "--"
346         body = "\r\n".join(form) + "\r\n"
347         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
348                    }
349         return tw_client.getPage(url, method="POST", postdata=body,
350                                  headers=headers, followRedirect=False)
351
352     def GET_discard(self, urlpath, stall):
353         url = self.webish_url + urlpath + "?filename=dummy-get.out"
354         return discardPage(url, stall)
355
356     def _print_usage(self, res=None):
357         d = self.control_rref.callRemote("get_memory_usage")
358         def _print(stats):
359             print "VmSize: %9d  VmPeak: %9d" % (stats["VmSize"],
360                                                 stats["VmPeak"])
361             return stats
362         d.addCallback(_print)
363         return d
364
365     def _do_upload(self, res, size, files, uris):
366         name = '%d' % size
367         print
368         print "uploading %s" % name
369         if self.mode in ("upload", "upload-self"):
370             files[name] = self.create_data(name, size)
371             d = self.control_rref.callRemote("upload_from_file_to_uri",
372                                              files[name])
373             def _done(uri):
374                 os.remove(files[name])
375                 del files[name]
376                 return uri
377             d.addCallback(_done)
378         elif self.mode == "upload-POST":
379             data = "a" * size
380             url = "/uri"
381             d = self.POST(url, t="upload", file=("%d.data" % size, data))
382         elif self.mode in ("receive",
383                            "download", "download-GET", "download-GET-slow"):
384             # mode=receive: upload the data from a local peer, so that the
385             # client-under-test receives and stores the shares
386             #
387             # mode=download*: upload the data from a local peer, then have
388             # the client-under-test download it.
389             #
390             # we need to wait until the uploading node has connected to all
391             # peers, since the wait_for_client_connections() above doesn't
392             # pay attention to our self.nodes[] and their connections.
393             files[name] = self.create_data(name, size)
394             u = self.nodes[0].getServiceNamed("uploader")
395             d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
396             d.addCallback(lambda res: u.upload(upload.FileName(files[name])))
397         else:
398             raise RuntimeError("unknown mode=%s" % self.mode)
399         def _complete(uri):
400             uris[name] = uri
401             print "uploaded %s" % name
402         d.addCallback(_complete)
403         return d
404
405     def _do_download(self, res, size, uris):
406         if self.mode not in ("download", "download-GET", "download-GET-slow"):
407             return
408         name = '%d' % size
409         print "downloading %s" % name
410         uri = uris[name]
411
412         if self.mode == "download":
413             d = self.control_rref.callRemote("download_from_uri_to_file",
414                                              uri, "dummy.out")
415         elif self.mode == "download-GET":
416             url = "/uri/%s" % uri
417             d = self.GET_discard(urllib.quote(url), stall=False)
418         elif self.mode == "download-GET-slow":
419             url = "/uri/%s" % uri
420             d = self.GET_discard(urllib.quote(url), stall=True)
421
422         def _complete(res):
423             print "downloaded %s" % name
424             return res
425         d.addCallback(_complete)
426         return d
427
428     def do_test(self):
429         #print "CLIENT STARTED"
430         #print "FURL", self.control_furl
431         #print "RREF", self.control_rref
432         #print
433         kB = 1000; MB = 1000*1000
434         files = {}
435         uris = {}
436
437         d = self._print_usage()
438         d.addCallback(self.stash_stats, "0B")
439
440         for i in range(10):
441             d.addCallback(self._do_upload, 10*kB+i, files, uris)
442             d.addCallback(self._do_download, 10*kB+i, uris)
443             d.addCallback(self._print_usage)
444         d.addCallback(self.stash_stats, "10kB")
445
446         for i in range(3):
447             d.addCallback(self._do_upload, 10*MB+i, files, uris)
448             d.addCallback(self._do_download, 10*MB+i, uris)
449             d.addCallback(self._print_usage)
450         d.addCallback(self.stash_stats, "10MB")
451
452         for i in range(1):
453             d.addCallback(self._do_upload, 50*MB+i, files, uris)
454             d.addCallback(self._do_download, 50*MB+i, uris)
455             d.addCallback(self._print_usage)
456         d.addCallback(self.stash_stats, "50MB")
457
458         #for i in range(1):
459         #    d.addCallback(self._do_upload, 100*MB+i, files, uris)
460         #    d.addCallback(self._do_download, 100*MB+i, uris)
461         #    d.addCallback(self._print_usage)
462         #d.addCallback(self.stash_stats, "100MB")
463
464         #d.addCallback(self.stall)
465         def _done(res):
466             print "FINISHING"
467         d.addCallback(_done)
468         return d
469
470     def stall(self, res):
471         d = defer.Deferred()
472         reactor.callLater(5, d.callback, None)
473         return d
474
475
476 class ClientWatcher(protocol.ProcessProtocol):
477     ended = False
478     def outReceived(self, data):
479         print "OUT:", data
480     def errReceived(self, data):
481         print "ERR:", data
482     def processEnded(self, reason):
483         self.ended = reason
484         self.d.callback(None)
485
486
487 if __name__ == '__main__':
488     mode = "upload"
489     if len(sys.argv) > 1:
490         mode = sys.argv[1]
491     # put the logfile and stats.out in _test_memory/ . These stick around.
492     # put the nodes and other files in _test_memory/test/ . These are
493     # removed each time we run.
494     sf = SystemFramework("_test_memory", mode)
495     sf.run()
496