]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/check_memory.py
check-memory: add 'receive' mode, for #97 (consumption during share receive
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / check_memory.py
1 #! /usr/bin/env python
2
3 import os, shutil, sys, urllib
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web import client as tw_client
8 from allmydata import client, introducer_and_vdrive
9 from allmydata.scripts import create_node
10 from allmydata.util import testutil, fileutil
11 import foolscap
12 from foolscap import eventual
13 from twisted.python import log
14
15 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
16     full_speed_ahead = False
17     _bytes_so_far = 0
18     stalled = None
19     def handleResponsePart(self, data):
20         self._bytes_so_far += len(data)
21         if not self.factory.do_stall:
22             return
23         if self.full_speed_ahead:
24             return
25         if self._bytes_so_far > 1e6+100:
26             if not self.stalled:
27                 print "STALLING"
28                 self.transport.pauseProducing()
29                 self.stalled = reactor.callLater(10.0, self._resume_speed)
30     def _resume_speed(self):
31         print "RESUME SPEED"
32         self.stalled = None
33         self.full_speed_ahead = True
34         self.transport.resumeProducing()
35     def handleResponseEnd(self):
36         if self.stalled:
37             print "CANCEL"
38             self.stalled.cancel()
39             self.stalled = None
40         return tw_client.HTTPPageGetter.handleResponseEnd(self)
41
42 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
43     protocol = StallableHTTPGetterDiscarder
44
45 def discardPage(url, stall=False, *args, **kwargs):
46     """Start fetching the URL, but stall our pipe after the first 1MB.
47     Wait 10 seconds, then resume downloading (and discarding) everything.
48     """
49     # adapted from twisted.web.client.getPage . We can't just wrap or
50     # subclass because it provides no way to override the HTTPClientFactory
51     # that it creates.
52     scheme, host, port, path = tw_client._parse(url)
53     factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
54     factory.do_stall = stall
55     assert scheme == 'http'
56     reactor.connectTCP(host, port, factory)
57     return factory.deferred
58
59 class SystemFramework(testutil.PollMixin):
60     numnodes = 5
61
62     def __init__(self, basedir, mode):
63         self.basedir = basedir = os.path.abspath(basedir)
64         if not basedir.startswith(os.path.abspath(".")):
65             raise AssertionError("safety issue: basedir must be a subdir")
66         self.testdir = testdir = os.path.join(basedir, "test")
67         if os.path.exists(testdir):
68             shutil.rmtree(testdir)
69         fileutil.make_dirs(testdir)
70         self.sparent = service.MultiService()
71         self.sparent.startService()
72         self.proc = None
73         self.tub = foolscap.Tub()
74         self.tub.setServiceParent(self.sparent)
75         self.mode = mode
76         self.failed = False
77
78     def run(self):
79         log.startLogging(open(os.path.join(self.testdir, "log"), "w"),
80                          setStdout=False)
81         #logfile = open(os.path.join(self.testdir, "log"), "w")
82         #flo = log.FileLogObserver(logfile)
83         #log.startLoggingWithObserver(flo.emit, setStdout=False)
84         d = eventual.fireEventually()
85         d.addCallback(lambda res: self.setUp())
86         d.addCallback(lambda res: self.do_test())
87         d.addBoth(self.tearDown)
88         def _err(err):
89             self.failed = err
90             log.err(err)
91             print err
92         d.addErrback(_err)
93         def _done(res):
94             reactor.stop()
95             return res
96         d.addBoth(_done)
97         reactor.run()
98         if self.failed:
99             self.failed.raiseException()
100
101     def setUp(self):
102         #print "STARTING"
103         self.stats = {}
104         self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
105         d = self.make_introducer_and_vdrive()
106         def _more(res):
107             self.make_nodes()
108             return self.start_client()
109         d.addCallback(_more)
110         def _record_control_furl(control_furl):
111             self.control_furl = control_furl
112             #print "OBTAINING '%s'" % (control_furl,)
113             return self.tub.getReference(self.control_furl)
114         d.addCallback(_record_control_furl)
115         def _record_control(control_rref):
116             self.control_rref = control_rref
117             return control_rref.callRemote("wait_for_client_connections",
118                                            self.numnodes+1)
119         d.addCallback(_record_control)
120         def _ready(res):
121             #print "CLIENT READY"
122             pass
123         d.addCallback(_ready)
124         return d
125
126     def tearDown(self, passthrough):
127         # the client node will shut down in a few seconds
128         #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
129         log.msg("shutting down SystemTest services")
130         d = defer.succeed(None)
131         if self.proc:
132             d.addCallback(lambda res: self.kill_client())
133         d.addCallback(lambda res: self.sparent.stopService())
134         d.addCallback(lambda res: eventual.flushEventualQueue())
135         def _close_statsfile(res):
136             self.statsfile.close()
137         d.addCallback(_close_statsfile)
138         d.addCallback(lambda res: passthrough)
139         return d
140
141     def add_service(self, s):
142         s.setServiceParent(self.sparent)
143         return s
144
145     def make_introducer_and_vdrive(self):
146         iv_basedir = os.path.join(self.testdir, "introducer_and_vdrive")
147         os.mkdir(iv_basedir)
148         iv = introducer_and_vdrive.IntroducerAndVdrive(basedir=iv_basedir)
149         self.introducer_and_vdrive = self.add_service(iv)
150         d = self.introducer_and_vdrive.when_tub_ready()
151         return d
152
153     def make_nodes(self):
154         q = self.introducer_and_vdrive
155         self.introducer_furl = q.urls["introducer"]
156         self.vdrive_furl = q.urls["vdrive"]
157         self.nodes = []
158         for i in range(self.numnodes):
159             nodedir = os.path.join(self.testdir, "node%d" % i)
160             os.mkdir(nodedir)
161             f = open(os.path.join(nodedir, "introducer.furl"), "w")
162             f.write(self.introducer_furl)
163             f.close()
164             f = open(os.path.join(nodedir, "vdrive.furl"), "w")
165             f.write(self.vdrive_furl)
166             f.close()
167             # the only tests for which we want the internal nodes to actually
168             # retain shares are the ones where somebody's going to download
169             # them.
170             if self.mode in ("download", "download-GET", "download-GET-slow"):
171                 # retain shares
172                 pass
173             else:
174                 # for these tests, we tell the storage servers to pretend to
175                 # accept shares, but really just throw them out, since we're
176                 # only testing upload and not download.
177                 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
178                 f.write("no_storage\n")
179                 f.close()
180             if self.mode in ("receive",):
181                 # for this mode, the client-under-test gets all the shares,
182                 # so our internal nodes can refuse requests
183                 f = open(os.path.join(nodedir, "sizelimit"), "w")
184                 f.write("0\n")
185                 f.close()
186             c = self.add_service(client.Client(basedir=nodedir))
187             self.nodes.append(c)
188         # the peers will start running, eventually they will connect to each
189         # other and the introducer_and_vdrive
190
191     def touch_keepalive(self):
192         f = open(self.keepalive_file, "w")
193         f.write("""\
194 If the node notices this file at startup, it will poll every 5 seconds and
195 terminate if the file is more than 10 seconds old, or if it has been deleted.
196 If the test harness has an internal failure and neglects to kill off the node
197 itself, this helps to avoid leaving processes lying around. The contents of
198 this file are ignored.
199         """)
200         f.close()
201
202     def start_client(self):
203         # this returns a Deferred that fires with the client's control.furl
204         log.msg("MAKING CLIENT")
205         clientdir = self.clientdir = os.path.join(self.testdir, "client")
206         quiet = StringIO()
207         create_node.create_client(clientdir, {}, out=quiet)
208         log.msg("DONE MAKING CLIENT")
209         f = open(os.path.join(clientdir, "introducer.furl"), "w")
210         f.write(self.introducer_furl + "\n")
211         f.close()
212         f = open(os.path.join(clientdir, "vdrive.furl"), "w")
213         f.write(self.vdrive_furl + "\n")
214         f.close()
215         f = open(os.path.join(clientdir, "webport"), "w")
216         # TODO: ideally we would set webport=0 and then ask the node what
217         # port it picked. But at the moment it is not convenient to do this,
218         # so we just pick a relatively unique one.
219         webport = max(os.getpid(), 2000)
220         f.write("tcp:%d:interface=127.0.0.1\n" % webport)
221         f.close()
222         self.webish_url = "http://localhost:%d" % webport
223         if self.mode in ("upload-self", "receive"):
224             # accept and store shares, to trigger the memory consumption bugs
225             pass
226         else:
227             # don't accept any shares
228             f = open(os.path.join(clientdir, "sizelimit"), "w")
229             f.write("0\n")
230             f.close()
231             ## also, if we do receive any shares, throw them away
232             #f = open(os.path.join(clientdir, "debug_no_storage"), "w")
233             #f.write("no_storage\n")
234             #f.close()
235         if self.mode == "upload-self":
236             f = open(os.path.join(clientdir, "push_to_ourselves"), "w")
237             f.write("push_to_ourselves\n")
238             f.close()
239         self.keepalive_file = os.path.join(clientdir,
240                                            "suicide_prevention_hotline")
241         # now start updating the mtime.
242         self.touch_keepalive()
243         ts = internet.TimerService(4.0, self.touch_keepalive)
244         ts.setServiceParent(self.sparent)
245
246         pp = ClientWatcher()
247         self.proc_done = pp.d = defer.Deferred()
248         logfile = os.path.join(self.basedir, "client.log")
249         cmd = ["twistd", "-n", "-y", "client.tac", "-l", logfile]
250         env = os.environ.copy()
251         self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
252         log.msg("CLIENT STARTED")
253
254         # now we wait for the client to get started. we're looking for the
255         # control.furl file to appear.
256         furl_file = os.path.join(clientdir, "control.furl")
257         def _check():
258             if pp.ended and pp.ended.value.status != 0:
259                 # the twistd process ends normally (with rc=0) if the child
260                 # is successfully launched. It ends abnormally (with rc!=0)
261                 # if the child cannot be launched.
262                 raise RuntimeError("process ended while waiting for startup")
263             return os.path.exists(furl_file)
264         d = self.poll(_check, 0.1)
265         # once it exists, wait a moment before we read from it, just in case
266         # it hasn't finished writing the whole thing. Ideally control.furl
267         # would be created in some atomic fashion, or made non-readable until
268         # it's ready, but I can't think of an easy way to do that, and I
269         # think the chances that we'll observe a half-write are pretty low.
270         def _stall(res):
271             d2 = defer.Deferred()
272             reactor.callLater(0.1, d2.callback, None)
273             return d2
274         d.addCallback(_stall)
275         def _read(res):
276             f = open(furl_file, "r")
277             furl = f.read()
278             return furl.strip()
279         d.addCallback(_read)
280         return d
281
282
283     def kill_client(self):
284         # returns a Deferred that fires when the process exits. This may only
285         # be called once.
286         try:
287             self.proc.signalProcess("INT")
288         except error.ProcessExitedAlready:
289             pass
290         return self.proc_done
291
292
293     def create_data(self, name, size):
294         filename = os.path.join(self.testdir, name + ".data")
295         f = open(filename, "wb")
296         block = "a" * 8192
297         while size > 0:
298             l = min(size, 8192)
299             f.write(block[:l])
300             size -= l
301         return filename
302
303     def stash_stats(self, stats, name):
304         self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
305         self.statsfile.flush()
306         self.stats[name] = stats['VmPeak']
307
308     def POST(self, urlpath, **fields):
309         url = self.webish_url + urlpath
310         sepbase = "boogabooga"
311         sep = "--" + sepbase
312         form = []
313         form.append(sep)
314         form.append('Content-Disposition: form-data; name="_charset"')
315         form.append('')
316         form.append('UTF-8')
317         form.append(sep)
318         for name, value in fields.iteritems():
319             if isinstance(value, tuple):
320                 filename, value = value
321                 form.append('Content-Disposition: form-data; name="%s"; '
322                             'filename="%s"' % (name, filename))
323             else:
324                 form.append('Content-Disposition: form-data; name="%s"' % name)
325             form.append('')
326             form.append(value)
327             form.append(sep)
328         form[-1] += "--"
329         body = "\r\n".join(form) + "\r\n"
330         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
331                    }
332         return tw_client.getPage(url, method="POST", postdata=body,
333                                  headers=headers, followRedirect=False)
334
335     def GET_discard(self, urlpath, stall):
336         url = self.webish_url + urlpath + "?filename=dummy-get.out"
337         return discardPage(url, stall)
338
339     def _print_usage(self, res=None):
340         d = self.control_rref.callRemote("get_memory_usage")
341         def _print(stats):
342             print "VmSize: %9d  VmPeak: %9d" % (stats["VmSize"],
343                                                 stats["VmPeak"])
344             return stats
345         d.addCallback(_print)
346         return d
347
348     def _do_upload(self, res, size, files, uris):
349         name = '%d' % size
350         print
351         print "uploading %s" % name
352         if self.mode in ("upload", "upload-self"):
353             files[name] = self.create_data(name, size)
354             d = self.control_rref.callRemote("upload_from_file_to_uri",
355                                              files[name])
356             def _done(uri):
357                 os.remove(files[name])
358                 del files[name]
359                 return uri
360             d.addCallback(_done)
361         elif self.mode == "upload-POST":
362             data = "a" * size
363             url = "/vdrive/global"
364             d = self.POST(url, t="upload", file=("%d.data" % size, data))
365         elif self.mode in ("receive",):
366             # upload the data from a local peer, so that the
367             # client-under-test receives and stores the shares
368             files[name] = self.create_data(name, size)
369             u = self.nodes[0].getServiceNamed("uploader")
370             d = u.upload_filename(files[name])
371         elif self.mode in ("download", "download-GET", "download-GET-slow"):
372             # upload the data from a local peer, then have the
373             # client-under-test download it.
374             files[name] = self.create_data(name, size)
375             u = self.nodes[0].getServiceNamed("uploader")
376             d = u.upload_filename(files[name])
377         else:
378             raise RuntimeError("unknown mode=%s" % self.mode)
379         def _complete(uri):
380             uris[name] = uri
381             print "uploaded %s" % name
382         d.addCallback(_complete)
383         return d
384
385     def _do_download(self, res, size, uris):
386         if self.mode not in ("download", "download-GET", "download-GET-slow"):
387             return
388         name = '%d' % size
389         print "downloading %s" % name
390         uri = uris[name]
391
392         if self.mode == "download":
393             d = self.control_rref.callRemote("download_from_uri_to_file",
394                                              uri, "dummy.out")
395         elif self.mode == "download-GET":
396             url = "/uri/%s" % uri
397             d = self.GET_discard(urllib.quote(url), stall=False)
398         elif self.mode == "download-GET-slow":
399             url = "/uri/%s" % uri
400             d = self.GET_discard(urllib.quote(url), stall=True)
401
402         def _complete(res):
403             print "downloaded %s" % name
404             return res
405         d.addCallback(_complete)
406         return d
407
408     def do_test(self):
409         #print "CLIENT STARTED"
410         #print "FURL", self.control_furl
411         #print "RREF", self.control_rref
412         #print
413         kB = 1000; MB = 1000*1000
414         files = {}
415         uris = {}
416
417         d = self._print_usage()
418         d.addCallback(self.stash_stats, "0B")
419
420         for i in range(10):
421             d.addCallback(self._do_upload, 10*kB+i, files, uris)
422             d.addCallback(self._do_download, 10*kB+i, uris)
423             d.addCallback(self._print_usage)
424         d.addCallback(self.stash_stats, "10kB")
425
426         for i in range(3):
427             d.addCallback(self._do_upload, 10*MB+i, files, uris)
428             d.addCallback(self._do_download, 10*MB+i, uris)
429             d.addCallback(self._print_usage)
430         d.addCallback(self.stash_stats, "10MB")
431
432         for i in range(1):
433             d.addCallback(self._do_upload, 50*MB+i, files, uris)
434             d.addCallback(self._do_download, 50*MB+i, uris)
435             d.addCallback(self._print_usage)
436         d.addCallback(self.stash_stats, "50MB")
437
438         #for i in range(1):
439         #    d.addCallback(self._do_upload, 100*MB+i, files, uris)
440         #    d.addCallback(self._do_download, 100*MB+i, uris)
441         #    d.addCallback(self._print_usage)
442         #d.addCallback(self.stash_stats, "100MB")
443
444         #d.addCallback(self.stall)
445         def _done(res):
446             print "FINISHING"
447         d.addCallback(_done)
448         return d
449
450     def stall(self, res):
451         d = defer.Deferred()
452         reactor.callLater(5, d.callback, None)
453         return d
454
455
456 class ClientWatcher(protocol.ProcessProtocol):
457     ended = False
458     def outReceived(self, data):
459         print "OUT:", data
460     def errReceived(self, data):
461         print "ERR:", data
462     def processEnded(self, reason):
463         self.ended = reason
464         self.d.callback(None)
465
466
467 if __name__ == '__main__':
468     mode = "upload"
469     if len(sys.argv) > 1:
470         mode = sys.argv[1]
471     # put the logfile and stats.out in _test_memory/ . These stick around.
472     # put the nodes and other files in _test_memory/test/ . These are
473     # removed each time we run.
474     sf = SystemFramework("_test_memory", mode)
475     sf.run()
476