]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/check_memory.py
check_memory: don't accept shares for download/download-GET test, since that hits...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / check_memory.py
1 #! /usr/bin/env python
2
3 import os, shutil, sys, urllib
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web.client import getPage, downloadPage
8 from allmydata import client, introducer_and_vdrive
9 from allmydata.scripts import create_node
10 from allmydata.util import testutil, fileutil
11 import foolscap
12 from foolscap import eventual
13 from twisted.python import log
14
15 class SystemFramework(testutil.PollMixin):
16     numnodes = 5
17
18     def __init__(self, basedir, mode):
19         self.basedir = basedir = os.path.abspath(basedir)
20         if not basedir.startswith(os.path.abspath(".")):
21             raise AssertionError("safety issue: basedir must be a subdir")
22         self.testdir = testdir = os.path.join(basedir, "test")
23         if os.path.exists(testdir):
24             shutil.rmtree(testdir)
25         fileutil.make_dirs(testdir)
26         self.sparent = service.MultiService()
27         self.sparent.startService()
28         self.proc = None
29         self.tub = foolscap.Tub()
30         self.tub.setServiceParent(self.sparent)
31         self.discard_shares = True
32         self.mode = mode
33         if mode in ("download", "download-GET"):
34             self.discard_shares = False
35         self.failed = False
36
37     def run(self):
38         log.startLogging(open(os.path.join(self.testdir, "log"), "w"),
39                          setStdout=False)
40         #logfile = open(os.path.join(self.testdir, "log"), "w")
41         #flo = log.FileLogObserver(logfile)
42         #log.startLoggingWithObserver(flo.emit, setStdout=False)
43         d = eventual.fireEventually()
44         d.addCallback(lambda res: self.setUp())
45         d.addCallback(lambda res: self.do_test())
46         d.addBoth(self.tearDown)
47         def _err(err):
48             self.failed = err
49             log.err(err)
50             print err
51         d.addErrback(_err)
52         def _done(res):
53             reactor.stop()
54             return res
55         d.addBoth(_done)
56         reactor.run()
57         if self.failed:
58             self.failed.raiseException()
59
60     def setUp(self):
61         #print "STARTING"
62         self.stats = {}
63         self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
64         d = self.make_introducer_and_vdrive()
65         def _more(res):
66             self.make_nodes()
67             return self.start_client()
68         d.addCallback(_more)
69         def _record_control_furl(control_furl):
70             self.control_furl = control_furl
71             #print "OBTAINING '%s'" % (control_furl,)
72             return self.tub.getReference(self.control_furl)
73         d.addCallback(_record_control_furl)
74         def _record_control(control_rref):
75             self.control_rref = control_rref
76             return control_rref.callRemote("wait_for_client_connections",
77                                            self.numnodes+1)
78         d.addCallback(_record_control)
79         def _ready(res):
80             #print "CLIENT READY"
81             pass
82         d.addCallback(_ready)
83         return d
84
85     def tearDown(self, passthrough):
86         # the client node will shut down in a few seconds
87         #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
88         log.msg("shutting down SystemTest services")
89         d = defer.succeed(None)
90         if self.proc:
91             d.addCallback(lambda res: self.kill_client())
92         d.addCallback(lambda res: self.sparent.stopService())
93         d.addCallback(lambda res: eventual.flushEventualQueue())
94         def _close_statsfile(res):
95             self.statsfile.close()
96         d.addCallback(_close_statsfile)
97         d.addCallback(lambda res: passthrough)
98         return d
99
100     def add_service(self, s):
101         s.setServiceParent(self.sparent)
102         return s
103
104     def make_introducer_and_vdrive(self):
105         iv_basedir = os.path.join(self.testdir, "introducer_and_vdrive")
106         os.mkdir(iv_basedir)
107         iv = introducer_and_vdrive.IntroducerAndVdrive(basedir=iv_basedir)
108         self.introducer_and_vdrive = self.add_service(iv)
109         d = self.introducer_and_vdrive.when_tub_ready()
110         return d
111
112     def make_nodes(self):
113         q = self.introducer_and_vdrive
114         self.introducer_furl = q.urls["introducer"]
115         self.vdrive_furl = q.urls["vdrive"]
116         self.nodes = []
117         for i in range(self.numnodes):
118             nodedir = os.path.join(self.testdir, "node%d" % i)
119             os.mkdir(nodedir)
120             f = open(os.path.join(nodedir, "introducer.furl"), "w")
121             f.write(self.introducer_furl)
122             f.close()
123             f = open(os.path.join(nodedir, "vdrive.furl"), "w")
124             f.write(self.vdrive_furl)
125             f.close()
126             if self.discard_shares:
127                 # for this test, we tell the storage servers to throw out all
128                 # their stored data, since we're only testing upload and not
129                 # download.
130                 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
131                 f.write("no_storage\n")
132                 f.close()
133             c = self.add_service(client.Client(basedir=nodedir))
134             self.nodes.append(c)
135         # the peers will start running, eventually they will connect to each
136         # other and the introducer_and_vdrive
137
138     def touch_keepalive(self):
139         f = open(self.keepalive_file, "w")
140         f.write("""\
141 If the node notices this file at startup, it will poll every 5 seconds and
142 terminate if the file is more than 10 seconds old, or if it has been deleted.
143 If the test harness has an internal failure and neglects to kill off the node
144 itself, this helps to avoid leaving processes lying around. The contents of
145 this file are ignored.
146         """)
147         f.close()
148
149     def start_client(self):
150         # this returns a Deferred that fires with the client's control.furl
151         log.msg("MAKING CLIENT")
152         clientdir = self.clientdir = os.path.join(self.testdir, "client")
153         quiet = StringIO()
154         create_node.create_client(clientdir, {}, out=quiet)
155         log.msg("DONE MAKING CLIENT")
156         f = open(os.path.join(clientdir, "introducer.furl"), "w")
157         f.write(self.introducer_furl + "\n")
158         f.close()
159         f = open(os.path.join(clientdir, "vdrive.furl"), "w")
160         f.write(self.vdrive_furl + "\n")
161         f.close()
162         f = open(os.path.join(clientdir, "webport"), "w")
163         # TODO: ideally we would set webport=0 and then ask the node what
164         # port it picked. But at the moment it is not convenient to do this,
165         # so we just pick a relatively unique one.
166         webport = max(os.getpid(), 2000)
167         f.write("tcp:%d:interface=127.0.0.1\n" % webport)
168         f.close()
169         self.webish_url = "http://localhost:%d" % webport
170         if self.discard_shares:
171             f = open(os.path.join(clientdir, "debug_no_storage"), "w")
172             f.write("no_storage\n")
173             f.close()
174         if self.mode in ("upload-self"):
175             f = open(os.path.join(clientdir, "push_to_ourselves"), "w")
176             f.write("push_to_ourselves\n")
177             f.close()
178         else:
179             f = open(os.path.join(clientdir, "sizelimit"), "w")
180             f.write("0\n")
181             f.close()
182         self.keepalive_file = os.path.join(clientdir,
183                                            "suicide_prevention_hotline")
184         # now start updating the mtime.
185         self.touch_keepalive()
186         ts = internet.TimerService(4.0, self.touch_keepalive)
187         ts.setServiceParent(self.sparent)
188
189         pp = ClientWatcher()
190         self.proc_done = pp.d = defer.Deferred()
191         logfile = os.path.join(self.basedir, "client.log")
192         cmd = ["twistd", "-n", "-y", "client.tac", "-l", logfile]
193         env = os.environ.copy()
194         self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
195         log.msg("CLIENT STARTED")
196
197         # now we wait for the client to get started. we're looking for the
198         # control.furl file to appear.
199         furl_file = os.path.join(clientdir, "control.furl")
200         def _check():
201             if pp.ended and pp.ended.value.status != 0:
202                 # the twistd process ends normally (with rc=0) if the child
203                 # is successfully launched. It ends abnormally (with rc!=0)
204                 # if the child cannot be launched.
205                 raise RuntimeError("process ended while waiting for startup")
206             return os.path.exists(furl_file)
207         d = self.poll(_check, 0.1)
208         # once it exists, wait a moment before we read from it, just in case
209         # it hasn't finished writing the whole thing. Ideally control.furl
210         # would be created in some atomic fashion, or made non-readable until
211         # it's ready, but I can't think of an easy way to do that, and I
212         # think the chances that we'll observe a half-write are pretty low.
213         def _stall(res):
214             d2 = defer.Deferred()
215             reactor.callLater(0.1, d2.callback, None)
216             return d2
217         d.addCallback(_stall)
218         def _read(res):
219             f = open(furl_file, "r")
220             furl = f.read()
221             return furl.strip()
222         d.addCallback(_read)
223         return d
224
225
226     def kill_client(self):
227         # returns a Deferred that fires when the process exits. This may only
228         # be called once.
229         try:
230             self.proc.signalProcess("INT")
231         except error.ProcessExitedAlready:
232             pass
233         return self.proc_done
234
235
236     def create_data(self, name, size):
237         filename = os.path.join(self.testdir, name + ".data")
238         f = open(filename, "wb")
239         block = "a" * 8192
240         while size > 0:
241             l = min(size, 8192)
242             f.write(block[:l])
243             size -= l
244         return filename
245
246     def stash_stats(self, stats, name):
247         self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
248         self.statsfile.flush()
249         self.stats[name] = stats['VmPeak']
250
251     def POST(self, urlpath, **fields):
252         url = self.webish_url + urlpath
253         sepbase = "boogabooga"
254         sep = "--" + sepbase
255         form = []
256         form.append(sep)
257         form.append('Content-Disposition: form-data; name="_charset"')
258         form.append('')
259         form.append('UTF-8')
260         form.append(sep)
261         for name, value in fields.iteritems():
262             if isinstance(value, tuple):
263                 filename, value = value
264                 form.append('Content-Disposition: form-data; name="%s"; '
265                             'filename="%s"' % (name, filename))
266             else:
267                 form.append('Content-Disposition: form-data; name="%s"' % name)
268             form.append('')
269             form.append(value)
270             form.append(sep)
271         form[-1] += "--"
272         body = "\r\n".join(form) + "\r\n"
273         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
274                    }
275         return getPage(url, method="POST", postdata=body,
276                        headers=headers, followRedirect=False)
277
278     def GET_discard(self, urlpath):
279         # TODO: Slow
280         url = self.webish_url + urlpath + "?filename=dummy-get.out"
281         return downloadPage(url, os.path.join(self.basedir, "dummy-get.out"))
282
283     def _print_usage(self, res=None):
284         d = self.control_rref.callRemote("get_memory_usage")
285         def _print(stats):
286             print "VmSize: %9d  VmPeak: %9d" % (stats["VmSize"],
287                                                 stats["VmPeak"])
288             return stats
289         d.addCallback(_print)
290         return d
291
292     def _do_upload(self, res, size, files, uris):
293         name = '%d' % size
294         print
295         print "uploading %s" % name
296         if self.mode in ("upload", "upload-self"):
297             files[name] = self.create_data(name, size)
298             d = self.control_rref.callRemote("upload_from_file_to_uri",
299                                              files[name])
300             def _done(uri):
301                 os.remove(files[name])
302                 del files[name]
303                 return uri
304             d.addCallback(_done)
305         elif self.mode == "upload-POST":
306             data = "a" * size
307             url = "/vdrive/global"
308             d = self.POST(url, t="upload", file=("%d.data" % size, data))
309         elif self.mode in ("download", "download-GET"):
310             # upload the data from a local peer, then have the
311             # client-under-test download it.
312             files[name] = self.create_data(name, size)
313             u = self.nodes[0].getServiceNamed("uploader")
314             d = u.upload_filename(files[name])
315         else:
316             raise RuntimeError("unknown mode=%s" % self.mode)
317         def _complete(uri):
318             uris[name] = uri
319             print "uploaded %s" % name
320         d.addCallback(_complete)
321         return d
322
323     def _do_download(self, res, size, uris):
324         if self.mode not in ("download", "download-GET"):
325             return
326         name = '%d' % size
327         uri = uris[name]
328         if self.mode == "download":
329             d = self.control_rref.callRemote("download_from_uri_to_file",
330                                              uri, "dummy.out")
331         if self.mode == "download-GET":
332             url = "/uri/%s" % uri
333             d = self.GET_discard(urllib.quote(url))
334
335         return d
336
337     def do_test(self):
338         #print "CLIENT STARTED"
339         #print "FURL", self.control_furl
340         #print "RREF", self.control_rref
341         #print
342         kB = 1000; MB = 1000*1000
343         files = {}
344         uris = {}
345
346         d = self._print_usage()
347         d.addCallback(self.stash_stats, "0B")
348
349         for i in range(10):
350             d.addCallback(self._do_upload, 10*kB+i, files, uris)
351             d.addCallback(self._do_download, 10*kB+i, uris)
352             d.addCallback(self._print_usage)
353         d.addCallback(self.stash_stats, "10kB")
354
355         for i in range(3):
356             d.addCallback(self._do_upload, 10*MB+i, files, uris)
357             d.addCallback(self._do_download, 10*MB+i, uris)
358             d.addCallback(self._print_usage)
359         d.addCallback(self.stash_stats, "10MB")
360
361         for i in range(1):
362             d.addCallback(self._do_upload, 50*MB+i, files, uris)
363             d.addCallback(self._do_download, 50*MB+i, uris)
364             d.addCallback(self._print_usage)
365         d.addCallback(self.stash_stats, "50MB")
366
367         #for i in range(1):
368         #    d.addCallback(self._do_upload, 100*MB+i, files, uris)
369         #    d.addCallback(self._do_download, 100*MB+i, uris)
370         #    d.addCallback(self._print_usage)
371         #d.addCallback(self.stash_stats, "100MB")
372
373         #d.addCallback(self.stall)
374         def _done(res):
375             print "FINISHING"
376         d.addCallback(_done)
377         return d
378
379     def stall(self, res):
380         d = defer.Deferred()
381         reactor.callLater(5, d.callback, None)
382         return d
383
384
385 class ClientWatcher(protocol.ProcessProtocol):
386     ended = False
387     def outReceived(self, data):
388         print "OUT:", data
389     def errReceived(self, data):
390         print "ERR:", data
391     def processEnded(self, reason):
392         self.ended = reason
393         self.d.callback(None)
394
395
396 if __name__ == '__main__':
397     mode = "upload"
398     if len(sys.argv) > 1:
399         mode = sys.argv[1]
400     # put the logfile and stats.out in _test_memory/ . These stick around.
401     # put the nodes and other files in _test_memory/test/ . These are
402     # removed each time we run.
403     sf = SystemFramework("_test_memory", mode)
404     sf.run()
405