]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/check_memory.py
introducer: remove remaining bits of 'push-to-myself' flags. The uploading/downloadin...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / check_memory.py
1 #! /usr/bin/env python
2
3 import os, shutil, sys, urllib, time, stat
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web import client as tw_client
8 from allmydata import client, introducer, upload
9 from allmydata.scripts import create_node
10 from allmydata.util import testutil, fileutil
11 import foolscap
12 from foolscap import eventual
13 from twisted.python import log
14
15 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
16     full_speed_ahead = False
17     _bytes_so_far = 0
18     stalled = None
19     def handleResponsePart(self, data):
20         self._bytes_so_far += len(data)
21         if not self.factory.do_stall:
22             return
23         if self.full_speed_ahead:
24             return
25         if self._bytes_so_far > 1e6+100:
26             if not self.stalled:
27                 print "STALLING"
28                 self.transport.pauseProducing()
29                 self.stalled = reactor.callLater(10.0, self._resume_speed)
30     def _resume_speed(self):
31         print "RESUME SPEED"
32         self.stalled = None
33         self.full_speed_ahead = True
34         self.transport.resumeProducing()
35     def handleResponseEnd(self):
36         if self.stalled:
37             print "CANCEL"
38             self.stalled.cancel()
39             self.stalled = None
40         return tw_client.HTTPPageGetter.handleResponseEnd(self)
41
42 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
43     protocol = StallableHTTPGetterDiscarder
44
45 def discardPage(url, stall=False, *args, **kwargs):
46     """Start fetching the URL, but stall our pipe after the first 1MB.
47     Wait 10 seconds, then resume downloading (and discarding) everything.
48     """
49     # adapted from twisted.web.client.getPage . We can't just wrap or
50     # subclass because it provides no way to override the HTTPClientFactory
51     # that it creates.
52     scheme, host, port, path = tw_client._parse(url)
53     factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
54     factory.do_stall = stall
55     assert scheme == 'http'
56     reactor.connectTCP(host, port, factory)
57     return factory.deferred
58
59 class SystemFramework(testutil.PollMixin):
60     numnodes = 5
61
62     def __init__(self, basedir, mode):
63         self.basedir = basedir = os.path.abspath(basedir)
64         if not basedir.startswith(os.path.abspath(".")):
65             raise AssertionError("safety issue: basedir must be a subdir")
66         self.testdir = testdir = os.path.join(basedir, "test")
67         if os.path.exists(testdir):
68             shutil.rmtree(testdir)
69         fileutil.make_dirs(testdir)
70         self.sparent = service.MultiService()
71         self.sparent.startService()
72         self.proc = None
73         self.tub = foolscap.Tub()
74         self.tub.setServiceParent(self.sparent)
75         self.mode = mode
76         self.failed = False
77         self.keepalive_file = None
78
79     def run(self):
80         framelog = os.path.join(self.basedir, "driver.log")
81         log.startLogging(open(framelog, "a"), setStdout=False)
82         log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
83         #logfile = open(os.path.join(self.testdir, "log"), "w")
84         #flo = log.FileLogObserver(logfile)
85         #log.startLoggingWithObserver(flo.emit, setStdout=False)
86         d = eventual.fireEventually()
87         d.addCallback(lambda res: self.setUp())
88         d.addCallback(lambda res: self.record_initial_memusage())
89         d.addCallback(lambda res: self.make_nodes())
90         d.addCallback(lambda res: self.wait_for_client_connected())
91         d.addCallback(lambda res: self.do_test())
92         d.addBoth(self.tearDown)
93         def _err(err):
94             self.failed = err
95             log.err(err)
96             print err
97         d.addErrback(_err)
98         def _done(res):
99             reactor.stop()
100             return res
101         d.addBoth(_done)
102         reactor.run()
103         if self.failed:
104             # raiseException doesn't work for CopiedFailures
105             self.failed.raiseException()
106
107     def setUp(self):
108         #print "STARTING"
109         self.stats = {}
110         self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
111         d = self.make_introducer()
112         def _more(res):
113             return self.start_client()
114         d.addCallback(_more)
115         def _record_control_furl(control_furl):
116             self.control_furl = control_furl
117             #print "OBTAINING '%s'" % (control_furl,)
118             return self.tub.getReference(self.control_furl)
119         d.addCallback(_record_control_furl)
120         def _record_control(control_rref):
121             self.control_rref = control_rref
122         d.addCallback(_record_control)
123         def _ready(res):
124             #print "CLIENT READY"
125             pass
126         d.addCallback(_ready)
127         return d
128
129     def record_initial_memusage(self):
130         print
131         print "Client started (no connections yet)"
132         d = self._print_usage()
133         d.addCallback(self.stash_stats, "init")
134         return d
135
136     def wait_for_client_connected(self):
137         print
138         print "Client connecting to other nodes.."
139         return self.control_rref.callRemote("wait_for_client_connections",
140                                             self.numnodes+1)
141
142     def tearDown(self, passthrough):
143         # the client node will shut down in a few seconds
144         #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
145         log.msg("shutting down SystemTest services")
146         if self.keepalive_file and os.path.exists(self.keepalive_file):
147             age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
148             log.msg("keepalive file at shutdown was %ds old" % age)
149         d = defer.succeed(None)
150         if self.proc:
151             d.addCallback(lambda res: self.kill_client())
152         d.addCallback(lambda res: self.sparent.stopService())
153         d.addCallback(lambda res: eventual.flushEventualQueue())
154         def _close_statsfile(res):
155             self.statsfile.close()
156         d.addCallback(_close_statsfile)
157         d.addCallback(lambda res: passthrough)
158         return d
159
160     def add_service(self, s):
161         s.setServiceParent(self.sparent)
162         return s
163
164     def make_introducer(self):
165         iv_basedir = os.path.join(self.testdir, "introducer")
166         os.mkdir(iv_basedir)
167         iv = introducer.IntroducerNode(basedir=iv_basedir)
168         self.introducer = self.add_service(iv)
169         d = self.introducer.when_tub_ready()
170         def _introducer_ready(res):
171             q = self.introducer
172             self.introducer_furl = q.introducer_url
173         d.addCallback(_introducer_ready)
174         return d
175
176     def make_nodes(self):
177         self.nodes = []
178         for i in range(self.numnodes):
179             nodedir = os.path.join(self.testdir, "node%d" % i)
180             os.mkdir(nodedir)
181             f = open(os.path.join(nodedir, "introducer.furl"), "w")
182             f.write(self.introducer_furl)
183             f.close()
184             # the only tests for which we want the internal nodes to actually
185             # retain shares are the ones where somebody's going to download
186             # them.
187             if self.mode in ("download", "download-GET", "download-GET-slow"):
188                 # retain shares
189                 pass
190             else:
191                 # for these tests, we tell the storage servers to pretend to
192                 # accept shares, but really just throw them out, since we're
193                 # only testing upload and not download.
194                 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
195                 f.write("no_storage\n")
196                 f.close()
197             if self.mode in ("receive",):
198                 # for this mode, the client-under-test gets all the shares,
199                 # so our internal nodes can refuse requests
200                 f = open(os.path.join(nodedir, "sizelimit"), "w")
201                 f.write("0\n")
202                 f.close()
203             c = self.add_service(client.Client(basedir=nodedir))
204             self.nodes.append(c)
205         # the peers will start running, eventually they will connect to each
206         # other and the introducer
207
208     def touch_keepalive(self):
209         if os.path.exists(self.keepalive_file):
210             age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
211             log.msg("touching keepalive file, was %ds old" % age)
212         f = open(self.keepalive_file, "w")
213         f.write("""\
214 If the node notices this file at startup, it will poll every 5 seconds and
215 terminate if the file is more than 10 seconds old, or if it has been deleted.
216 If the test harness has an internal failure and neglects to kill off the node
217 itself, this helps to avoid leaving processes lying around. The contents of
218 this file are ignored.
219         """)
220         f.close()
221
222     def start_client(self):
223         # this returns a Deferred that fires with the client's control.furl
224         log.msg("MAKING CLIENT")
225         clientdir = self.clientdir = os.path.join(self.testdir, "client")
226         quiet = StringIO()
227         create_node.create_client(clientdir, {}, out=quiet)
228         log.msg("DONE MAKING CLIENT")
229         f = open(os.path.join(clientdir, "introducer.furl"), "w")
230         f.write(self.introducer_furl + "\n")
231         f.close()
232         f = open(os.path.join(clientdir, "webport"), "w")
233         # TODO: ideally we would set webport=0 and then ask the node what
234         # port it picked. But at the moment it is not convenient to do this,
235         # so we just pick a relatively unique one.
236         webport = max(os.getpid(), 2000)
237         f.write("tcp:%d:interface=127.0.0.1\n" % webport)
238         f.close()
239         self.webish_url = "http://localhost:%d" % webport
240         if self.mode in ("upload-self", "receive"):
241             # accept and store shares, to trigger the memory consumption bugs
242             pass
243         else:
244             # don't accept any shares
245             f = open(os.path.join(clientdir, "readonly_storage"), "w")
246             f.write("true\n")
247             f.close()
248             ## also, if we do receive any shares, throw them away
249             #f = open(os.path.join(clientdir, "debug_no_storage"), "w")
250             #f.write("no_storage\n")
251             #f.close()
252         if self.mode == "upload-self":
253             pass
254         self.keepalive_file = os.path.join(clientdir,
255                                            "suicide_prevention_hotline")
256         # now start updating the mtime.
257         self.touch_keepalive()
258         ts = internet.TimerService(1.0, self.touch_keepalive)
259         ts.setServiceParent(self.sparent)
260
261         pp = ClientWatcher()
262         self.proc_done = pp.d = defer.Deferred()
263         logfile = os.path.join(self.basedir, "client.log")
264         cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
265         env = os.environ.copy()
266         self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
267         log.msg("CLIENT STARTED")
268
269         # now we wait for the client to get started. we're looking for the
270         # control.furl file to appear.
271         furl_file = os.path.join(clientdir, "private", "control.furl")
272         def _check():
273             if pp.ended and pp.ended.value.status != 0:
274                 # the twistd process ends normally (with rc=0) if the child
275                 # is successfully launched. It ends abnormally (with rc!=0)
276                 # if the child cannot be launched.
277                 raise RuntimeError("process ended while waiting for startup")
278             return os.path.exists(furl_file)
279         d = self.poll(_check, 0.1)
280         # once it exists, wait a moment before we read from it, just in case
281         # it hasn't finished writing the whole thing. Ideally control.furl
282         # would be created in some atomic fashion, or made non-readable until
283         # it's ready, but I can't think of an easy way to do that, and I
284         # think the chances that we'll observe a half-write are pretty low.
285         def _stall(res):
286             d2 = defer.Deferred()
287             reactor.callLater(0.1, d2.callback, None)
288             return d2
289         d.addCallback(_stall)
290         def _read(res):
291             f = open(furl_file, "r")
292             furl = f.read()
293             return furl.strip()
294         d.addCallback(_read)
295         return d
296
297
298     def kill_client(self):
299         # returns a Deferred that fires when the process exits. This may only
300         # be called once.
301         try:
302             self.proc.signalProcess("INT")
303         except error.ProcessExitedAlready:
304             pass
305         return self.proc_done
306
307
308     def create_data(self, name, size):
309         filename = os.path.join(self.testdir, name + ".data")
310         f = open(filename, "wb")
311         block = "a" * 8192
312         while size > 0:
313             l = min(size, 8192)
314             f.write(block[:l])
315             size -= l
316         return filename
317
318     def stash_stats(self, stats, name):
319         self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
320         self.statsfile.flush()
321         self.stats[name] = stats['VmPeak']
322
323     def POST(self, urlpath, **fields):
324         url = self.webish_url + urlpath
325         sepbase = "boogabooga"
326         sep = "--" + sepbase
327         form = []
328         form.append(sep)
329         form.append('Content-Disposition: form-data; name="_charset"')
330         form.append('')
331         form.append('UTF-8')
332         form.append(sep)
333         for name, value in fields.iteritems():
334             if isinstance(value, tuple):
335                 filename, value = value
336                 form.append('Content-Disposition: form-data; name="%s"; '
337                             'filename="%s"' % (name, filename))
338             else:
339                 form.append('Content-Disposition: form-data; name="%s"' % name)
340             form.append('')
341             form.append(value)
342             form.append(sep)
343         form[-1] += "--"
344         body = "\r\n".join(form) + "\r\n"
345         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
346                    }
347         return tw_client.getPage(url, method="POST", postdata=body,
348                                  headers=headers, followRedirect=False)
349
350     def GET_discard(self, urlpath, stall):
351         url = self.webish_url + urlpath + "?filename=dummy-get.out"
352         return discardPage(url, stall)
353
354     def _print_usage(self, res=None):
355         d = self.control_rref.callRemote("get_memory_usage")
356         def _print(stats):
357             print "VmSize: %9d  VmPeak: %9d" % (stats["VmSize"],
358                                                 stats["VmPeak"])
359             return stats
360         d.addCallback(_print)
361         return d
362
363     def _do_upload(self, res, size, files, uris):
364         name = '%d' % size
365         print
366         print "uploading %s" % name
367         if self.mode in ("upload", "upload-self"):
368             files[name] = self.create_data(name, size)
369             d = self.control_rref.callRemote("upload_from_file_to_uri",
370                                              files[name])
371             def _done(uri):
372                 os.remove(files[name])
373                 del files[name]
374                 return uri
375             d.addCallback(_done)
376         elif self.mode == "upload-POST":
377             data = "a" * size
378             url = "/uri"
379             d = self.POST(url, t="upload", file=("%d.data" % size, data))
380         elif self.mode in ("receive",
381                            "download", "download-GET", "download-GET-slow"):
382             # mode=receive: upload the data from a local peer, so that the
383             # client-under-test receives and stores the shares
384             #
385             # mode=download*: upload the data from a local peer, then have
386             # the client-under-test download it.
387             #
388             # we need to wait until the uploading node has connected to all
389             # peers, since the wait_for_client_connections() above doesn't
390             # pay attention to our self.nodes[] and their connections.
391             files[name] = self.create_data(name, size)
392             u = self.nodes[0].getServiceNamed("uploader")
393             d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
394             d.addCallback(lambda res: u.upload(upload.FileName(files[name])))
395         else:
396             raise RuntimeError("unknown mode=%s" % self.mode)
397         def _complete(uri):
398             uris[name] = uri
399             print "uploaded %s" % name
400         d.addCallback(_complete)
401         return d
402
403     def _do_download(self, res, size, uris):
404         if self.mode not in ("download", "download-GET", "download-GET-slow"):
405             return
406         name = '%d' % size
407         print "downloading %s" % name
408         uri = uris[name]
409
410         if self.mode == "download":
411             d = self.control_rref.callRemote("download_from_uri_to_file",
412                                              uri, "dummy.out")
413         elif self.mode == "download-GET":
414             url = "/uri/%s" % uri
415             d = self.GET_discard(urllib.quote(url), stall=False)
416         elif self.mode == "download-GET-slow":
417             url = "/uri/%s" % uri
418             d = self.GET_discard(urllib.quote(url), stall=True)
419
420         def _complete(res):
421             print "downloaded %s" % name
422             return res
423         d.addCallback(_complete)
424         return d
425
426     def do_test(self):
427         #print "CLIENT STARTED"
428         #print "FURL", self.control_furl
429         #print "RREF", self.control_rref
430         #print
431         kB = 1000; MB = 1000*1000
432         files = {}
433         uris = {}
434
435         d = self._print_usage()
436         d.addCallback(self.stash_stats, "0B")
437
438         for i in range(10):
439             d.addCallback(self._do_upload, 10*kB+i, files, uris)
440             d.addCallback(self._do_download, 10*kB+i, uris)
441             d.addCallback(self._print_usage)
442         d.addCallback(self.stash_stats, "10kB")
443
444         for i in range(3):
445             d.addCallback(self._do_upload, 10*MB+i, files, uris)
446             d.addCallback(self._do_download, 10*MB+i, uris)
447             d.addCallback(self._print_usage)
448         d.addCallback(self.stash_stats, "10MB")
449
450         for i in range(1):
451             d.addCallback(self._do_upload, 50*MB+i, files, uris)
452             d.addCallback(self._do_download, 50*MB+i, uris)
453             d.addCallback(self._print_usage)
454         d.addCallback(self.stash_stats, "50MB")
455
456         #for i in range(1):
457         #    d.addCallback(self._do_upload, 100*MB+i, files, uris)
458         #    d.addCallback(self._do_download, 100*MB+i, uris)
459         #    d.addCallback(self._print_usage)
460         #d.addCallback(self.stash_stats, "100MB")
461
462         #d.addCallback(self.stall)
463         def _done(res):
464             print "FINISHING"
465         d.addCallback(_done)
466         return d
467
468     def stall(self, res):
469         d = defer.Deferred()
470         reactor.callLater(5, d.callback, None)
471         return d
472
473
474 class ClientWatcher(protocol.ProcessProtocol):
475     ended = False
476     def outReceived(self, data):
477         print "OUT:", data
478     def errReceived(self, data):
479         print "ERR:", data
480     def processEnded(self, reason):
481         self.ended = reason
482         self.d.callback(None)
483
484
485 if __name__ == '__main__':
486     mode = "upload"
487     if len(sys.argv) > 1:
488         mode = sys.argv[1]
489     # put the logfile and stats.out in _test_memory/ . These stick around.
490     # put the nodes and other files in _test_memory/test/ . These are
491     # removed each time we run.
492     sf = SystemFramework("_test_memory", mode)
493     sf.run()
494