]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/check_memory.py
check_memory: add download, download-GET
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / check_memory.py
1 #! /usr/bin/env python
2
3 import os, shutil, sys, urllib
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web.client import getPage, downloadPage
8 from allmydata import client, introducer_and_vdrive
9 from allmydata.scripts import create_node
10 from allmydata.util import testutil, fileutil
11 import foolscap
12 from foolscap import eventual
13 from twisted.python import log
14
15 class SystemFramework(testutil.PollMixin):
16     numnodes = 5
17
18     def __init__(self, basedir, mode):
19         self.basedir = basedir = os.path.abspath(basedir)
20         if not basedir.startswith(os.path.abspath(".")):
21             raise AssertionError("safety issue: basedir must be a subdir")
22         self.testdir = testdir = os.path.join(basedir, "test")
23         if os.path.exists(testdir):
24             shutil.rmtree(testdir)
25         fileutil.make_dirs(testdir)
26         self.sparent = service.MultiService()
27         self.sparent.startService()
28         self.proc = None
29         self.tub = foolscap.Tub()
30         self.tub.setServiceParent(self.sparent)
31         self.discard_shares = True
32         self.mode = mode
33         if mode in ("download", "download-GET"):
34             self.discard_shares = False
35         self.failed = False
36
37     def run(self):
38         log.startLogging(open(os.path.join(self.testdir, "log"), "w"),
39                          setStdout=False)
40         #logfile = open(os.path.join(self.testdir, "log"), "w")
41         #flo = log.FileLogObserver(logfile)
42         #log.startLoggingWithObserver(flo.emit, setStdout=False)
43         d = eventual.fireEventually()
44         d.addCallback(lambda res: self.setUp())
45         d.addCallback(lambda res: self.do_test())
46         d.addBoth(self.tearDown)
47         def _err(err):
48             self.failed = err
49             log.err(err)
50             print err
51         d.addErrback(_err)
52         def _done(res):
53             reactor.stop()
54             return res
55         d.addBoth(_done)
56         reactor.run()
57         if self.failed:
58             self.failed.raiseException()
59
60     def setUp(self):
61         #print "STARTING"
62         self.stats = {}
63         self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
64         d = self.make_introducer_and_vdrive()
65         def _more(res):
66             self.make_nodes()
67             return self.start_client()
68         d.addCallback(_more)
69         def _record_control_furl(control_furl):
70             self.control_furl = control_furl
71             #print "OBTAINING '%s'" % (control_furl,)
72             return self.tub.getReference(self.control_furl)
73         d.addCallback(_record_control_furl)
74         def _record_control(control_rref):
75             self.control_rref = control_rref
76             return control_rref.callRemote("wait_for_client_connections",
77                                            self.numnodes+1)
78         d.addCallback(_record_control)
79         def _ready(res):
80             #print "CLIENT READY"
81             pass
82         d.addCallback(_ready)
83         return d
84
85     def tearDown(self, passthrough):
86         # the client node will shut down in a few seconds
87         #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
88         log.msg("shutting down SystemTest services")
89         d = defer.succeed(None)
90         if self.proc:
91             d.addCallback(lambda res: self.kill_client())
92         d.addCallback(lambda res: self.sparent.stopService())
93         d.addCallback(lambda res: eventual.flushEventualQueue())
94         def _close_statsfile(res):
95             self.statsfile.close()
96         d.addCallback(_close_statsfile)
97         d.addCallback(lambda res: passthrough)
98         return d
99
100     def add_service(self, s):
101         s.setServiceParent(self.sparent)
102         return s
103
104     def make_introducer_and_vdrive(self):
105         iv_basedir = os.path.join(self.testdir, "introducer_and_vdrive")
106         os.mkdir(iv_basedir)
107         iv = introducer_and_vdrive.IntroducerAndVdrive(basedir=iv_basedir)
108         self.introducer_and_vdrive = self.add_service(iv)
109         d = self.introducer_and_vdrive.when_tub_ready()
110         return d
111
112     def make_nodes(self):
113         q = self.introducer_and_vdrive
114         self.introducer_furl = q.urls["introducer"]
115         self.vdrive_furl = q.urls["vdrive"]
116         self.nodes = []
117         for i in range(self.numnodes):
118             nodedir = os.path.join(self.testdir, "node%d" % i)
119             os.mkdir(nodedir)
120             f = open(os.path.join(nodedir, "introducer.furl"), "w")
121             f.write(self.introducer_furl)
122             f.close()
123             f = open(os.path.join(nodedir, "vdrive.furl"), "w")
124             f.write(self.vdrive_furl)
125             f.close()
126             if self.discard_shares:
127                 # for this test, we tell the storage servers to throw out all
128                 # their stored data, since we're only testing upload and not
129                 # download.
130                 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
131                 f.write("no_storage\n")
132                 f.close()
133             c = self.add_service(client.Client(basedir=nodedir))
134             self.nodes.append(c)
135         # the peers will start running, eventually they will connect to each
136         # other and the introducer_and_vdrive
137
138     def touch_keepalive(self):
139         f = open(self.keepalive_file, "w")
140         f.write("""\
141 If the node notices this file at startup, it will poll every 5 seconds and
142 terminate if the file is more than 10 seconds old, or if it has been deleted.
143 If the test harness has an internal failure and neglects to kill off the node
144 itself, this helps to avoid leaving processes lying around. The contents of
145 this file are ignored.
146         """)
147         f.close()
148
149     def start_client(self):
150         # this returns a Deferred that fires with the client's control.furl
151         log.msg("MAKING CLIENT")
152         clientdir = self.clientdir = os.path.join(self.testdir, "client")
153         quiet = StringIO()
154         create_node.create_client(clientdir, {}, out=quiet)
155         log.msg("DONE MAKING CLIENT")
156         f = open(os.path.join(clientdir, "introducer.furl"), "w")
157         f.write(self.introducer_furl + "\n")
158         f.close()
159         f = open(os.path.join(clientdir, "vdrive.furl"), "w")
160         f.write(self.vdrive_furl + "\n")
161         f.close()
162         f = open(os.path.join(clientdir, "webport"), "w")
163         # TODO: ideally we would set webport=0 and then ask the node what
164         # port it picked. But at the moment it is not convenient to do this,
165         # so we just pick a relatively unique one.
166         webport = max(os.getpid(), 2000)
167         f.write("tcp:%d:interface=127.0.0.1\n" % webport)
168         f.close()
169         self.webish_url = "http://localhost:%d" % webport
170         if self.discard_shares:
171             f = open(os.path.join(clientdir, "debug_no_storage"), "w")
172             f.write("no_storage\n")
173             f.close()
174         if self.mode in ("upload-self", "download", "download-GET"):
175             f = open(os.path.join(clientdir, "push_to_ourselves"), "w")
176             f.write("push_to_ourselves\n")
177             f.close()
178         self.keepalive_file = os.path.join(clientdir,
179                                            "suicide_prevention_hotline")
180         # now start updating the mtime.
181         self.touch_keepalive()
182         ts = internet.TimerService(4.0, self.touch_keepalive)
183         ts.setServiceParent(self.sparent)
184
185         pp = ClientWatcher()
186         self.proc_done = pp.d = defer.Deferred()
187         logfile = os.path.join(self.basedir, "client.log")
188         cmd = ["twistd", "-n", "-y", "client.tac", "-l", logfile]
189         env = os.environ.copy()
190         self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
191         log.msg("CLIENT STARTED")
192
193         # now we wait for the client to get started. we're looking for the
194         # control.furl file to appear.
195         furl_file = os.path.join(clientdir, "control.furl")
196         def _check():
197             if pp.ended and pp.ended.value.status != 0:
198                 # the twistd process ends normally (with rc=0) if the child
199                 # is successfully launched. It ends abnormally (with rc!=0)
200                 # if the child cannot be launched.
201                 raise RuntimeError("process ended while waiting for startup")
202             return os.path.exists(furl_file)
203         d = self.poll(_check, 0.1)
204         # once it exists, wait a moment before we read from it, just in case
205         # it hasn't finished writing the whole thing. Ideally control.furl
206         # would be created in some atomic fashion, or made non-readable until
207         # it's ready, but I can't think of an easy way to do that, and I
208         # think the chances that we'll observe a half-write are pretty low.
209         def _stall(res):
210             d2 = defer.Deferred()
211             reactor.callLater(0.1, d2.callback, None)
212             return d2
213         d.addCallback(_stall)
214         def _read(res):
215             f = open(furl_file, "r")
216             furl = f.read()
217             return furl.strip()
218         d.addCallback(_read)
219         return d
220
221
222     def kill_client(self):
223         # returns a Deferred that fires when the process exits. This may only
224         # be called once.
225         try:
226             self.proc.signalProcess("INT")
227         except error.ProcessExitedAlready:
228             pass
229         return self.proc_done
230
231
232     def create_data(self, name, size):
233         filename = os.path.join(self.testdir, name + ".data")
234         f = open(filename, "wb")
235         block = "a" * 8192
236         while size > 0:
237             l = min(size, 8192)
238             f.write(block[:l])
239             size -= l
240         return filename
241
242     def stash_stats(self, stats, name):
243         self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
244         self.statsfile.flush()
245         self.stats[name] = stats['VmPeak']
246
247     def POST(self, urlpath, **fields):
248         url = self.webish_url + urlpath
249         sepbase = "boogabooga"
250         sep = "--" + sepbase
251         form = []
252         form.append(sep)
253         form.append('Content-Disposition: form-data; name="_charset"')
254         form.append('')
255         form.append('UTF-8')
256         form.append(sep)
257         for name, value in fields.iteritems():
258             if isinstance(value, tuple):
259                 filename, value = value
260                 form.append('Content-Disposition: form-data; name="%s"; '
261                             'filename="%s"' % (name, filename))
262             else:
263                 form.append('Content-Disposition: form-data; name="%s"' % name)
264             form.append('')
265             form.append(value)
266             form.append(sep)
267         form[-1] += "--"
268         body = "\r\n".join(form) + "\r\n"
269         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
270                    }
271         return getPage(url, method="POST", postdata=body,
272                        headers=headers, followRedirect=False)
273
274     def GET_discard(self, urlpath):
275         # TODO: Slow
276         url = self.webish_url + urlpath + "?filename=dummy-get.out"
277         return downloadPage(url, os.path.join(self.basedir, "dummy-get.out"))
278
279     def _print_usage(self, res=None):
280         d = self.control_rref.callRemote("get_memory_usage")
281         def _print(stats):
282             print "VmSize: %9d  VmPeak: %9d" % (stats["VmSize"],
283                                                 stats["VmPeak"])
284             return stats
285         d.addCallback(_print)
286         return d
287
288     def _do_upload(self, res, size, files, uris):
289         name = '%d' % size
290         print
291         print "uploading %s" % name
292         if self.mode in ("upload", "upload-self"):
293             files[name] = self.create_data(name, size)
294             d = self.control_rref.callRemote("upload_from_file_to_uri",
295                                              files[name])
296             def _done(uri):
297                 os.remove(files[name])
298                 del files[name]
299                 return uri
300             d.addCallback(_done)
301         elif self.mode == "upload-POST":
302             data = "a" * size
303             url = "/vdrive/global"
304             d = self.POST(url, t="upload", file=("%d.data" % size, data))
305         elif self.mode in ("download", "download-GET"):
306             # upload the data from a local peer, then have the
307             # client-under-test download it.
308             files[name] = self.create_data(name, size)
309             u = self.nodes[0].getServiceNamed("uploader")
310             d = u.upload_filename(files[name])
311         else:
312             raise RuntimeError("unknown mode=%s" % self.mode)
313         def _complete(uri):
314             uris[name] = uri
315             print "uploaded %s" % name
316         d.addCallback(_complete)
317         return d
318
319     def _do_download(self, res, size, uris):
320         if self.mode not in ("download", "download-GET"):
321             return
322         name = '%d' % size
323         uri = uris[name]
324         if self.mode == "download":
325             d = self.control_rref.callRemote("download_from_uri_to_file",
326                                              uri, "dummy.out")
327         if self.mode == "download-GET":
328             url = "/uri/%s" % uri
329             d = self.GET_discard(urllib.quote(url))
330
331         return d
332
333     def do_test(self):
334         #print "CLIENT STARTED"
335         #print "FURL", self.control_furl
336         #print "RREF", self.control_rref
337         #print
338         kB = 1000; MB = 1000*1000
339         files = {}
340         uris = {}
341
342         d = self._print_usage()
343         d.addCallback(self.stash_stats, "0B")
344
345         for i in range(10):
346             d.addCallback(self._do_upload, 10*kB+i, files, uris)
347             d.addCallback(self._do_download, 10*kB+i, uris)
348             d.addCallback(self._print_usage)
349         d.addCallback(self.stash_stats, "10kB")
350
351         for i in range(3):
352             d.addCallback(self._do_upload, 10*MB+i, files, uris)
353             d.addCallback(self._do_download, 10*MB+i, uris)
354             d.addCallback(self._print_usage)
355         d.addCallback(self.stash_stats, "10MB")
356
357         for i in range(1):
358             d.addCallback(self._do_upload, 50*MB+i, files, uris)
359             d.addCallback(self._do_download, 50*MB+i, uris)
360             d.addCallback(self._print_usage)
361         d.addCallback(self.stash_stats, "50MB")
362
363         #for i in range(1):
364         #    d.addCallback(self._do_upload, 100*MB+i, files, uris)
365         #    d.addCallback(self._do_download, 100*MB+i, uris)
366         #    d.addCallback(self._print_usage)
367         #d.addCallback(self.stash_stats, "100MB")
368
369         #d.addCallback(self.stall)
370         def _done(res):
371             print "FINISHING"
372         d.addCallback(_done)
373         return d
374
375     def stall(self, res):
376         d = defer.Deferred()
377         reactor.callLater(5, d.callback, None)
378         return d
379
380
381 class ClientWatcher(protocol.ProcessProtocol):
382     ended = False
383     def outReceived(self, data):
384         print "OUT:", data
385     def errReceived(self, data):
386         print "ERR:", data
387     def processEnded(self, reason):
388         self.ended = reason
389         self.d.callback(None)
390
391
392 if __name__ == '__main__':
393     mode = "upload"
394     if len(sys.argv) > 1:
395         mode = sys.argv[1]
396     # put the logfile and stats.out in _test_memory/ . These stick around.
397     # put the nodes and other files in _test_memory/test/ . These are
398     # removed each time we run.
399     sf = SystemFramework("_test_memory", mode)
400     sf.run()
401