]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/check_memory.py
check_memory.py: include a single 100MB test
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / check_memory.py
1 #! /usr/bin/env python
2
3 import os, shutil, sys
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web.client import getPage
8 from allmydata import client, introducer_and_vdrive
9 from allmydata.scripts import create_node
10 from allmydata.util import testutil
11 import foolscap
12 from foolscap import eventual
13 from twisted.python import log
14
15 class SystemFramework(testutil.PollMixin):
16     numnodes = 5
17
18     def __init__(self, basedir, mode):
19         self.basedir = basedir = os.path.abspath(basedir)
20         if not basedir.startswith(os.path.abspath(".")):
21             raise AssertionError("safety issue: basedir must be a subdir")
22         if os.path.exists(basedir):
23             shutil.rmtree(basedir)
24         os.mkdir(basedir)
25         self.sparent = service.MultiService()
26         self.sparent.startService()
27         self.proc = None
28         self.tub = foolscap.Tub()
29         self.tub.setServiceParent(self.sparent)
30         self.discard_shares = True
31         self.mode = mode
32
33     def run(self):
34         log.startLogging(open(os.path.join(self.basedir, "log"), "w"),
35                          setStdout=False)
36         #logfile = open(os.path.join(self.basedir, "log"), "w")
37         #flo = log.FileLogObserver(logfile)
38         #log.startLoggingWithObserver(flo.emit, setStdout=False)
39         d = eventual.fireEventually()
40         d.addCallback(lambda res: self.setUp())
41         d.addCallback(lambda res: self.do_test())
42         d.addBoth(self.tearDown)
43         def _err(err):
44             log.err(err)
45             print err
46         d.addErrback(_err)
47         d.addBoth(lambda res: reactor.stop())
48         reactor.run()
49
50     def setUp(self):
51         #print "STARTING"
52         self.stats = {}
53         self.statsfile = open(os.path.join(self.basedir, "stats.out"), "w")
54         d = self.make_introducer_and_vdrive()
55         def _more(res):
56             self.make_nodes()
57             return self.start_client()
58         d.addCallback(_more)
59         def _record_control_furl(control_furl):
60             self.control_furl = control_furl
61             #print "OBTAINING '%s'" % (control_furl,)
62             return self.tub.getReference(self.control_furl)
63         d.addCallback(_record_control_furl)
64         def _record_control(control_rref):
65             self.control_rref = control_rref
66             return control_rref.callRemote("wait_for_client_connections",
67                                            self.numnodes+1)
68         d.addCallback(_record_control)
69         def _ready(res):
70             #print "CLIENT READY"
71             pass
72         d.addCallback(_ready)
73         return d
74
75     def tearDown(self, passthrough):
76         # the client node will shut down in a few seconds
77         #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
78         log.msg("shutting down SystemTest services")
79         d = defer.succeed(None)
80         if self.proc:
81             d.addCallback(lambda res: self.kill_client())
82         d.addCallback(lambda res: self.sparent.stopService())
83         d.addCallback(lambda res: eventual.flushEventualQueue())
84         def _close_statsfile(res):
85             self.statsfile.close()
86         d.addCallback(_close_statsfile)
87         d.addCallback(lambda res: passthrough)
88         return d
89
90     def add_service(self, s):
91         s.setServiceParent(self.sparent)
92         return s
93
94     def make_introducer_and_vdrive(self):
95         iv_basedir = os.path.join(self.basedir, "introducer_and_vdrive")
96         os.mkdir(iv_basedir)
97         iv = introducer_and_vdrive.IntroducerAndVdrive(basedir=iv_basedir)
98         self.introducer_and_vdrive = self.add_service(iv)
99         d = self.introducer_and_vdrive.when_tub_ready()
100         return d
101
102     def make_nodes(self):
103         q = self.introducer_and_vdrive
104         self.introducer_furl = q.urls["introducer"]
105         self.vdrive_furl = q.urls["vdrive"]
106         self.nodes = []
107         for i in range(self.numnodes):
108             nodedir = os.path.join(self.basedir, "node%d" % i)
109             os.mkdir(nodedir)
110             f = open(os.path.join(nodedir, "introducer.furl"), "w")
111             f.write(self.introducer_furl)
112             f.close()
113             f = open(os.path.join(nodedir, "vdrive.furl"), "w")
114             f.write(self.vdrive_furl)
115             f.close()
116             if self.discard_shares:
117                 # for this test, we tell the storage servers to throw out all
118                 # their stored data, since we're only testing upload and not
119                 # download.
120                 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
121                 f.write("no_storage\n")
122                 f.close()
123             c = self.add_service(client.Client(basedir=nodedir))
124             self.nodes.append(c)
125         # the peers will start running, eventually they will connect to each
126         # other and the introducer_and_vdrive
127
128     def touch_keepalive(self):
129         f = open(self.keepalive_file, "w")
130         f.write("""\
131 If the node notices this file at startup, it will poll every 5 seconds and
132 terminate if the file is more than 10 seconds old, or if it has been deleted.
133 If the test harness has an internal failure and neglects to kill off the node
134 itself, this helps to avoid leaving processes lying around. The contents of
135 this file are ignored.
136         """)
137         f.close()
138
139     def start_client(self):
140         # this returns a Deferred that fires with the client's control.furl
141         log.msg("MAKING CLIENT")
142         clientdir = self.clientdir = os.path.join(self.basedir, "client")
143         quiet = StringIO()
144         create_node.create_client(clientdir, {}, out=quiet)
145         log.msg("DONE MAKING CLIENT")
146         f = open(os.path.join(clientdir, "introducer.furl"), "w")
147         f.write(self.introducer_furl + "\n")
148         f.close()
149         f = open(os.path.join(clientdir, "vdrive.furl"), "w")
150         f.write(self.vdrive_furl + "\n")
151         f.close()
152         f = open(os.path.join(clientdir, "webport"), "w")
153         # TODO: ideally we would set webport=0 and then ask the node what
154         # port it picked. But at the moment it is not convenient to do this,
155         # so we just pick a relatively unique one.
156         webport = max(os.getpid(), 2000)
157         f.write("tcp:%d:interface=127.0.0.1\n" % webport)
158         f.close()
159         self.webish_url = "http://localhost:%d" % webport
160         if self.discard_shares:
161             f = open(os.path.join(clientdir, "debug_no_storage"), "w")
162             f.write("no_storage\n")
163             f.close()
164         self.keepalive_file = os.path.join(clientdir,
165                                            "suicide_prevention_hotline")
166         # now start updating the mtime.
167         self.touch_keepalive()
168         ts = internet.TimerService(4.0, self.touch_keepalive)
169         ts.setServiceParent(self.sparent)
170
171         pp = ClientWatcher()
172         self.proc_done = pp.d = defer.Deferred()
173         cmd = ["twistd", "-y", "client.tac"]
174         env = os.environ.copy()
175         self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
176         log.msg("CLIENT STARTED")
177
178         # now we wait for the client to get started. we're looking for the
179         # control.furl file to appear.
180         furl_file = os.path.join(clientdir, "control.furl")
181         def _check():
182             return os.path.exists(furl_file)
183         d = self.poll(_check, 0.1)
184         # once it exists, wait a moment before we read from it, just in case
185         # it hasn't finished writing the whole thing. Ideally control.furl
186         # would be created in some atomic fashion, or made non-readable until
187         # it's ready, but I can't think of an easy way to do that, and I
188         # think the chances that we'll observe a half-write are pretty low.
189         def _stall(res):
190             d2 = defer.Deferred()
191             reactor.callLater(0.1, d2.callback, None)
192             return d2
193         d.addCallback(_stall)
194         def _read(res):
195             f = open(furl_file, "r")
196             furl = f.read()
197             return furl.strip()
198         d.addCallback(_read)
199         return d
200
201
202     def kill_client(self):
203         # returns a Deferred that fires when the process exits. This may only
204         # be called once.
205         try:
206             self.proc.signalProcess("KILL")
207         except error.ProcessExitedAlready:
208             pass
209         return self.proc_done
210
211
212     def create_data(self, name, size):
213         filename = os.path.join(self.basedir, name + ".data")
214         f = open(filename, "wb")
215         block = "a" * 8192
216         while size > 0:
217             l = min(size, 8192)
218             f.write(block[:l])
219             size -= l
220         return filename
221
222     def stash_stats(self, stats, name):
223         self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
224         self.stats[name] = stats['VmPeak']
225
226     def POST(self, urlpath, **fields):
227         url = self.webish_url + urlpath
228         sepbase = "boogabooga"
229         sep = "--" + sepbase
230         form = []
231         form.append(sep)
232         form.append('Content-Disposition: form-data; name="_charset"')
233         form.append('')
234         form.append('UTF-8')
235         form.append(sep)
236         for name, value in fields.iteritems():
237             if isinstance(value, tuple):
238                 filename, value = value
239                 form.append('Content-Disposition: form-data; name="%s"; '
240                             'filename="%s"' % (name, filename))
241             else:
242                 form.append('Content-Disposition: form-data; name="%s"' % name)
243             form.append('')
244             form.append(value)
245             form.append(sep)
246         form[-1] += "--"
247         body = "\r\n".join(form) + "\r\n"
248         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
249                    }
250         return getPage(url, method="POST", postdata=body,
251                        headers=headers, followRedirect=False)
252
253     def do_test(self):
254         #print "CLIENT STARTED"
255         #print "FURL", self.control_furl
256         #print "RREF", self.control_rref
257         #print
258         kB = 1000; MB = 1000*1000
259         files = {}
260         uris = {}
261         control = self.control_rref
262
263         def _print_usage(res=None):
264             d = control.callRemote("get_memory_usage")
265             def _print(stats):
266                 print "VmSize: %9d  VmPeak: %9d" % (stats["VmSize"],
267                                                     stats["VmPeak"])
268                 return stats
269             d.addCallback(_print)
270             return d
271
272         def _do_upload(res, size):
273             name = '%d' % size
274             print
275             print "uploading %s" % name
276             if self.mode == "upload":
277                 files[name] = self.create_data(name, size)
278                 d = control.callRemote("upload_from_file_to_uri", files[name])
279                 def _done(uri):
280                     os.remove(files[name])
281                     del files[name]
282                     return uri
283                 d.addCallback(_done)
284             elif self.mode == "upload-POST":
285                 data = "a" * size
286                 url = "/vdrive/global"
287                 d = self.POST(url, t="upload", file=("%d.data" % size, data))
288             else:
289                 raise RuntimeError("unknown mode=%s" % self.mode)
290             def _complete(uri):
291                 uris[name] = uri
292                 print "uploaded %s" % name
293             d.addCallback(_complete)
294             return d
295
296         d = _print_usage()
297         d.addCallback(self.stash_stats, "0B")
298
299         for i in range(10):
300             d.addCallback(_do_upload, size=10*kB+i)
301             d.addCallback(_print_usage)
302         d.addCallback(self.stash_stats, "10kB")
303
304         for i in range(3):
305             d.addCallback(_do_upload, size=10*MB+i)
306             d.addCallback(_print_usage)
307         d.addCallback(self.stash_stats, "10MB")
308
309         for i in range(3):
310             d.addCallback(_do_upload, size=50*MB+i)
311             d.addCallback(_print_usage)
312         d.addCallback(self.stash_stats, "50MB")
313
314         for i in range(1):
315             d.addCallback(_do_upload, size=100*MB+i)
316             d.addCallback(_print_usage)
317         d.addCallback(self.stash_stats, "100MB")
318
319         #d.addCallback(self.stall)
320         def _done(res):
321             print "FINISHING"
322         d.addCallback(_done)
323         return d
324
325     def stall(self, res):
326         d = defer.Deferred()
327         reactor.callLater(5, d.callback, None)
328         return d
329
330
331 class ClientWatcher(protocol.ProcessProtocol):
332     def outReceived(self, data):
333         print "OUT:", data
334     def errReceived(self, data):
335         print "ERR:", data
336     def processEnded(self, reason):
337         self.d.callback(None)
338
339
340 if __name__ == '__main__':
341     mode = "upload"
342     if len(sys.argv) > 1:
343         mode = sys.argv[1]
344     sf = SystemFramework("_test_memory", mode)
345     sf.run()
346