]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/check_memory.py
test/check_memory.py: oops, fix one last ur.uri -> ur.get_uri()
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / check_memory.py
1 import os, shutil, sys, urllib, time, stat
2 from cStringIO import StringIO
3 from twisted.internet import defer, reactor, protocol, error
4 from twisted.application import service, internet
5 from twisted.web import client as tw_client
6 from allmydata import client, introducer
7 from allmydata.immutable import upload
8 from allmydata.scripts import create_node
9 from allmydata.util import fileutil, pollmixin
10 from allmydata.util.fileutil import abspath_expanduser_unicode
11 from allmydata.util.encodingutil import get_filesystem_encoding
12 from foolscap.api import Tub, fireEventually, flushEventualQueue
13 from twisted.python import log
14
15 class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
16     full_speed_ahead = False
17     _bytes_so_far = 0
18     stalled = None
19     def handleResponsePart(self, data):
20         self._bytes_so_far += len(data)
21         if not self.factory.do_stall:
22             return
23         if self.full_speed_ahead:
24             return
25         if self._bytes_so_far > 1e6+100:
26             if not self.stalled:
27                 print "STALLING"
28                 self.transport.pauseProducing()
29                 self.stalled = reactor.callLater(10.0, self._resume_speed)
30     def _resume_speed(self):
31         print "RESUME SPEED"
32         self.stalled = None
33         self.full_speed_ahead = True
34         self.transport.resumeProducing()
35     def handleResponseEnd(self):
36         if self.stalled:
37             print "CANCEL"
38             self.stalled.cancel()
39             self.stalled = None
40         return tw_client.HTTPPageGetter.handleResponseEnd(self)
41
42 class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
43     protocol = StallableHTTPGetterDiscarder
44
45 def discardPage(url, stall=False, *args, **kwargs):
46     """Start fetching the URL, but stall our pipe after the first 1MB.
47     Wait 10 seconds, then resume downloading (and discarding) everything.
48     """
49     # adapted from twisted.web.client.getPage . We can't just wrap or
50     # subclass because it provides no way to override the HTTPClientFactory
51     # that it creates.
52     scheme, host, port, path = tw_client._parse(url)
53     factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
54     factory.do_stall = stall
55     assert scheme == 'http'
56     reactor.connectTCP(host, port, factory)
57     return factory.deferred
58
59 class ChildDidNotStartError(Exception):
60     pass
61
62 class SystemFramework(pollmixin.PollMixin):
63     numnodes = 7
64
65     def __init__(self, basedir, mode):
66         self.basedir = basedir = abspath_expanduser_unicode(unicode(basedir))
67         if not (basedir + os.path.sep).startswith(abspath_expanduser_unicode(u".") + os.path.sep):
68             raise AssertionError("safety issue: basedir must be a subdir")
69         self.testdir = testdir = os.path.join(basedir, "test")
70         if os.path.exists(testdir):
71             shutil.rmtree(testdir)
72         fileutil.make_dirs(testdir)
73         self.sparent = service.MultiService()
74         self.sparent.startService()
75         self.proc = None
76         self.tub = Tub()
77         self.tub.setOption("expose-remote-exception-types", False)
78         self.tub.setServiceParent(self.sparent)
79         self.mode = mode
80         self.failed = False
81         self.keepalive_file = None
82
83     def run(self):
84         framelog = os.path.join(self.basedir, "driver.log")
85         log.startLogging(open(framelog, "a"), setStdout=False)
86         log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
87         #logfile = open(os.path.join(self.testdir, "log"), "w")
88         #flo = log.FileLogObserver(logfile)
89         #log.startLoggingWithObserver(flo.emit, setStdout=False)
90         d = fireEventually()
91         d.addCallback(lambda res: self.setUp())
92         d.addCallback(lambda res: self.record_initial_memusage())
93         d.addCallback(lambda res: self.make_nodes())
94         d.addCallback(lambda res: self.wait_for_client_connected())
95         d.addCallback(lambda res: self.do_test())
96         d.addBoth(self.tearDown)
97         def _err(err):
98             self.failed = err
99             log.err(err)
100             print err
101         d.addErrback(_err)
102         def _done(res):
103             reactor.stop()
104             return res
105         d.addBoth(_done)
106         reactor.run()
107         if self.failed:
108             # raiseException doesn't work for CopiedFailures
109             self.failed.raiseException()
110
111     def setUp(self):
112         #print "STARTING"
113         self.stats = {}
114         self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
115         d = self.make_introducer()
116         def _more(res):
117             return self.start_client()
118         d.addCallback(_more)
119         def _record_control_furl(control_furl):
120             self.control_furl = control_furl
121             #print "OBTAINING '%s'" % (control_furl,)
122             return self.tub.getReference(self.control_furl)
123         d.addCallback(_record_control_furl)
124         def _record_control(control_rref):
125             self.control_rref = control_rref
126         d.addCallback(_record_control)
127         def _ready(res):
128             #print "CLIENT READY"
129             pass
130         d.addCallback(_ready)
131         return d
132
133     def record_initial_memusage(self):
134         print
135         print "Client started (no connections yet)"
136         d = self._print_usage()
137         d.addCallback(self.stash_stats, "init")
138         return d
139
140     def wait_for_client_connected(self):
141         print
142         print "Client connecting to other nodes.."
143         return self.control_rref.callRemote("wait_for_client_connections",
144                                             self.numnodes+1)
145
146     def tearDown(self, passthrough):
147         # the client node will shut down in a few seconds
148         #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
149         log.msg("shutting down SystemTest services")
150         if self.keepalive_file and os.path.exists(self.keepalive_file):
151             age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
152             log.msg("keepalive file at shutdown was %ds old" % age)
153         d = defer.succeed(None)
154         if self.proc:
155             d.addCallback(lambda res: self.kill_client())
156         d.addCallback(lambda res: self.sparent.stopService())
157         d.addCallback(lambda res: flushEventualQueue())
158         def _close_statsfile(res):
159             self.statsfile.close()
160         d.addCallback(_close_statsfile)
161         d.addCallback(lambda res: passthrough)
162         return d
163
164     def add_service(self, s):
165         s.setServiceParent(self.sparent)
166         return s
167
168     def make_introducer(self):
169         iv_basedir = os.path.join(self.testdir, "introducer")
170         os.mkdir(iv_basedir)
171         iv = introducer.IntroducerNode(basedir=iv_basedir)
172         self.introducer = self.add_service(iv)
173         d = self.introducer.when_tub_ready()
174         def _introducer_ready(res):
175             q = self.introducer
176             self.introducer_furl = q.introducer_url
177         d.addCallback(_introducer_ready)
178         return d
179
180     def make_nodes(self):
181         self.nodes = []
182         for i in range(self.numnodes):
183             nodedir = os.path.join(self.testdir, "node%d" % i)
184             os.mkdir(nodedir)
185             f = open(os.path.join(nodedir, "tahoe.cfg"), "w")
186             f.write("[client]\n"
187                     "introducer.furl = %s\n"
188                     "shares.happy = 1\n"
189                     "[storage]\n"
190                     % (self.introducer_furl,))
191             # the only tests for which we want the internal nodes to actually
192             # retain shares are the ones where somebody's going to download
193             # them.
194             if self.mode in ("download", "download-GET", "download-GET-slow"):
195                 # retain shares
196                 pass
197             else:
198                 # for these tests, we tell the storage servers to pretend to
199                 # accept shares, but really just throw them out, since we're
200                 # only testing upload and not download.
201                 f.write("debug_discard = true\n")
202             if self.mode in ("receive",):
203                 # for this mode, the client-under-test gets all the shares,
204                 # so our internal nodes can refuse requests
205                 f.write("readonly = true\n")
206             f.close()
207             c = self.add_service(client.Client(basedir=nodedir))
208             self.nodes.append(c)
209         # the peers will start running, eventually they will connect to each
210         # other and the introducer
211
212     def touch_keepalive(self):
213         if os.path.exists(self.keepalive_file):
214             age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
215             log.msg("touching keepalive file, was %ds old" % age)
216         f = open(self.keepalive_file, "w")
217         f.write("""\
218 If the node notices this file at startup, it will poll every 5 seconds and
219 terminate if the file is more than 10 seconds old, or if it has been deleted.
220 If the test harness has an internal failure and neglects to kill off the node
221 itself, this helps to avoid leaving processes lying around. The contents of
222 this file are ignored.
223         """)
224         f.close()
225
226     def start_client(self):
227         # this returns a Deferred that fires with the client's control.furl
228         log.msg("MAKING CLIENT")
229         # self.testdir is an absolute Unicode path
230         clientdir = self.clientdir = os.path.join(self.testdir, u"client")
231         clientdir_str = clientdir.encode(get_filesystem_encoding())
232         quiet = StringIO()
233         create_node.create_node({'basedir': clientdir}, out=quiet)
234         log.msg("DONE MAKING CLIENT")
235         # now replace tahoe.cfg
236         # set webport=0 and then ask the node what port it picked.
237         f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
238         f.write("[node]\n"
239                 "web.port = tcp:0:interface=127.0.0.1\n"
240                 "[client]\n"
241                 "introducer.furl = %s\n"
242                 "shares.happy = 1\n"
243                 "[storage]\n"
244                 % (self.introducer_furl,))
245
246         if self.mode in ("upload-self", "receive"):
247             # accept and store shares, to trigger the memory consumption bugs
248             pass
249         else:
250             # don't accept any shares
251             f.write("readonly = true\n")
252             ## also, if we do receive any shares, throw them away
253             #f.write("debug_discard = true")
254         if self.mode == "upload-self":
255             pass
256         f.close()
257         self.keepalive_file = os.path.join(clientdir,
258                                            "suicide_prevention_hotline")
259         # now start updating the mtime.
260         self.touch_keepalive()
261         ts = internet.TimerService(1.0, self.touch_keepalive)
262         ts.setServiceParent(self.sparent)
263
264         pp = ClientWatcher()
265         self.proc_done = pp.d = defer.Deferred()
266         logfile = os.path.join(self.basedir, "client.log")
267         cmd = ["twistd", "-n", "-y", "tahoe-client.tac", "-l", logfile]
268         env = os.environ.copy()
269         self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir_str)
270         log.msg("CLIENT STARTED")
271
272         # now we wait for the client to get started. we're looking for the
273         # control.furl file to appear.
274         furl_file = os.path.join(clientdir, "private", "control.furl")
275         url_file = os.path.join(clientdir, "node.url")
276         def _check():
277             if pp.ended and pp.ended.value.status != 0:
278                 # the twistd process ends normally (with rc=0) if the child
279                 # is successfully launched. It ends abnormally (with rc!=0)
280                 # if the child cannot be launched.
281                 raise ChildDidNotStartError("process ended while waiting for startup")
282             return os.path.exists(furl_file)
283         d = self.poll(_check, 0.1)
284         # once it exists, wait a moment before we read from it, just in case
285         # it hasn't finished writing the whole thing. Ideally control.furl
286         # would be created in some atomic fashion, or made non-readable until
287         # it's ready, but I can't think of an easy way to do that, and I
288         # think the chances that we'll observe a half-write are pretty low.
289         def _stall(res):
290             d2 = defer.Deferred()
291             reactor.callLater(0.1, d2.callback, None)
292             return d2
293         d.addCallback(_stall)
294         def _read(res):
295             # read the node's URL
296             self.webish_url = open(url_file, "r").read().strip()
297             if self.webish_url[-1] == "/":
298                 # trim trailing slash, since the rest of the code wants it gone
299                 self.webish_url = self.webish_url[:-1]
300             f = open(furl_file, "r")
301             furl = f.read()
302             return furl.strip()
303         d.addCallback(_read)
304         return d
305
306
307     def kill_client(self):
308         # returns a Deferred that fires when the process exits. This may only
309         # be called once.
310         try:
311             self.proc.signalProcess("INT")
312         except error.ProcessExitedAlready:
313             pass
314         return self.proc_done
315
316
317     def create_data(self, name, size):
318         filename = os.path.join(self.testdir, name + ".data")
319         f = open(filename, "wb")
320         block = "a" * 8192
321         while size > 0:
322             l = min(size, 8192)
323             f.write(block[:l])
324             size -= l
325         return filename
326
327     def stash_stats(self, stats, name):
328         self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
329         self.statsfile.flush()
330         self.stats[name] = stats['VmPeak']
331
332     def POST(self, urlpath, **fields):
333         url = self.webish_url + urlpath
334         sepbase = "boogabooga"
335         sep = "--" + sepbase
336         form = []
337         form.append(sep)
338         form.append('Content-Disposition: form-data; name="_charset"')
339         form.append('')
340         form.append('UTF-8')
341         form.append(sep)
342         for name, value in fields.iteritems():
343             if isinstance(value, tuple):
344                 filename, value = value
345                 form.append('Content-Disposition: form-data; name="%s"; '
346                             'filename="%s"' % (name, filename))
347             else:
348                 form.append('Content-Disposition: form-data; name="%s"' % name)
349             form.append('')
350             form.append(value)
351             form.append(sep)
352         form[-1] += "--"
353         body = "\r\n".join(form) + "\r\n"
354         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
355                    }
356         return tw_client.getPage(url, method="POST", postdata=body,
357                                  headers=headers, followRedirect=False)
358
359     def GET_discard(self, urlpath, stall):
360         url = self.webish_url + urlpath + "?filename=dummy-get.out"
361         return discardPage(url, stall)
362
363     def _print_usage(self, res=None):
364         d = self.control_rref.callRemote("get_memory_usage")
365         def _print(stats):
366             print "VmSize: %9d  VmPeak: %9d" % (stats["VmSize"],
367                                                 stats["VmPeak"])
368             return stats
369         d.addCallback(_print)
370         return d
371
372     def _do_upload(self, res, size, files, uris):
373         name = '%d' % size
374         print
375         print "uploading %s" % name
376         if self.mode in ("upload", "upload-self"):
377             files[name] = self.create_data(name, size)
378             d = self.control_rref.callRemote("upload_from_file_to_uri",
379                                              files[name].encode("utf-8"),
380                                              convergence="check-memory")
381             def _done(uri):
382                 os.remove(files[name])
383                 del files[name]
384                 return uri
385             d.addCallback(_done)
386         elif self.mode == "upload-POST":
387             data = "a" * size
388             url = "/uri"
389             d = self.POST(url, t="upload", file=("%d.data" % size, data))
390         elif self.mode in ("receive",
391                            "download", "download-GET", "download-GET-slow"):
392             # mode=receive: upload the data from a local peer, so that the
393             # client-under-test receives and stores the shares
394             #
395             # mode=download*: upload the data from a local peer, then have
396             # the client-under-test download it.
397             #
398             # we need to wait until the uploading node has connected to all
399             # peers, since the wait_for_client_connections() above doesn't
400             # pay attention to our self.nodes[] and their connections.
401             files[name] = self.create_data(name, size)
402             u = self.nodes[0].getServiceNamed("uploader")
403             d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
404             d.addCallback(lambda res:
405                           u.upload(upload.FileName(files[name],
406                                                    convergence="check-memory")))
407             d.addCallback(lambda results: results.get_uri())
408         else:
409             raise ValueError("unknown mode=%s" % self.mode)
410         def _complete(uri):
411             uris[name] = uri
412             print "uploaded %s" % name
413         d.addCallback(_complete)
414         return d
415
416     def _do_download(self, res, size, uris):
417         if self.mode not in ("download", "download-GET", "download-GET-slow"):
418             return
419         name = '%d' % size
420         print "downloading %s" % name
421         uri = uris[name]
422
423         if self.mode == "download":
424             d = self.control_rref.callRemote("download_from_uri_to_file",
425                                              uri, "dummy.out")
426         elif self.mode == "download-GET":
427             url = "/uri/%s" % uri
428             d = self.GET_discard(urllib.quote(url), stall=False)
429         elif self.mode == "download-GET-slow":
430             url = "/uri/%s" % uri
431             d = self.GET_discard(urllib.quote(url), stall=True)
432
433         def _complete(res):
434             print "downloaded %s" % name
435             return res
436         d.addCallback(_complete)
437         return d
438
439     def do_test(self):
440         #print "CLIENT STARTED"
441         #print "FURL", self.control_furl
442         #print "RREF", self.control_rref
443         #print
444         kB = 1000; MB = 1000*1000
445         files = {}
446         uris = {}
447
448         d = self._print_usage()
449         d.addCallback(self.stash_stats, "0B")
450
451         for i in range(10):
452             d.addCallback(self._do_upload, 10*kB+i, files, uris)
453             d.addCallback(self._do_download, 10*kB+i, uris)
454             d.addCallback(self._print_usage)
455         d.addCallback(self.stash_stats, "10kB")
456
457         for i in range(3):
458             d.addCallback(self._do_upload, 10*MB+i, files, uris)
459             d.addCallback(self._do_download, 10*MB+i, uris)
460             d.addCallback(self._print_usage)
461         d.addCallback(self.stash_stats, "10MB")
462
463         for i in range(1):
464             d.addCallback(self._do_upload, 50*MB+i, files, uris)
465             d.addCallback(self._do_download, 50*MB+i, uris)
466             d.addCallback(self._print_usage)
467         d.addCallback(self.stash_stats, "50MB")
468
469         #for i in range(1):
470         #    d.addCallback(self._do_upload, 100*MB+i, files, uris)
471         #    d.addCallback(self._do_download, 100*MB+i, uris)
472         #    d.addCallback(self._print_usage)
473         #d.addCallback(self.stash_stats, "100MB")
474
475         #d.addCallback(self.stall)
476         def _done(res):
477             print "FINISHING"
478         d.addCallback(_done)
479         return d
480
481     def stall(self, res):
482         d = defer.Deferred()
483         reactor.callLater(5, d.callback, None)
484         return d
485
486
487 class ClientWatcher(protocol.ProcessProtocol):
488     ended = False
489     def outReceived(self, data):
490         print "OUT:", data
491     def errReceived(self, data):
492         print "ERR:", data
493     def processEnded(self, reason):
494         self.ended = reason
495         self.d.callback(None)
496
497
498 if __name__ == '__main__':
499     mode = "upload"
500     if len(sys.argv) > 1:
501         mode = sys.argv[1]
502     # put the logfile and stats.out in _test_memory/ . These stick around.
503     # put the nodes and other files in _test_memory/test/ . These are
504     # removed each time we run.
505     sf = SystemFramework("_test_memory", mode)
506     sf.run()
507