]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/check_memory.py
check_memory.py: fix benign-but-noisy vdrive.furl handling bug
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / check_memory.py
1 #! /usr/bin/env python
2
3 import os, shutil, sys
4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from allmydata import client, introducer_and_vdrive
8 from allmydata.scripts import create_node
9 from allmydata.util import testutil
10 import foolscap
11 from foolscap import eventual
12 from twisted.python import log
13
14 class SystemFramework(testutil.PollMixin):
15     numnodes = 5
16
17     def __init__(self, basedir, mode):
18         self.basedir = basedir = os.path.abspath(basedir)
19         if not basedir.startswith(os.path.abspath(".")):
20             raise AssertionError("safety issue: basedir must be a subdir")
21         if os.path.exists(basedir):
22             shutil.rmtree(basedir)
23         os.mkdir(basedir)
24         self.sparent = service.MultiService()
25         self.sparent.startService()
26         self.proc = None
27         self.tub = foolscap.Tub()
28         self.tub.setServiceParent(self.sparent)
29         self.discard_shares = True
30         self.mode = mode
31
32     def run(self):
33         log.startLogging(open(os.path.join(self.basedir, "log"), "w"),
34                          setStdout=False)
35         #logfile = open(os.path.join(self.basedir, "log"), "w")
36         #flo = log.FileLogObserver(logfile)
37         #log.startLoggingWithObserver(flo.emit, setStdout=False)
38         d = eventual.fireEventually()
39         d.addCallback(lambda res: self.setUp())
40         d.addCallback(lambda res: self.do_test())
41         d.addBoth(self.tearDown)
42         def _err(err):
43             log.err(err)
44             print err
45         d.addErrback(_err)
46         d.addBoth(lambda res: reactor.stop())
47         reactor.run()
48
49     def setUp(self):
50         #print "STARTING"
51         self.stats = {}
52         self.statsfile = open(os.path.join(self.basedir, "stats.out"), "w")
53         d = self.make_introducer_and_vdrive()
54         def _more(res):
55             self.make_nodes()
56             return self.start_client()
57         d.addCallback(_more)
58         def _record_control_furl(control_furl):
59             self.control_furl = control_furl
60             #print "OBTAINING '%s'" % (control_furl,)
61             return self.tub.getReference(self.control_furl)
62         d.addCallback(_record_control_furl)
63         def _record_control(control_rref):
64             self.control_rref = control_rref
65             return control_rref.callRemote("wait_for_client_connections",
66                                            self.numnodes+1)
67         d.addCallback(_record_control)
68         def _ready(res):
69             #print "CLIENT READY"
70             pass
71         d.addCallback(_ready)
72         return d
73
74     def tearDown(self, passthrough):
75         # the client node will shut down in a few seconds
76         #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
77         log.msg("shutting down SystemTest services")
78         d = defer.succeed(None)
79         if self.proc:
80             d.addCallback(lambda res: self.kill_client())
81         d.addCallback(lambda res: self.sparent.stopService())
82         d.addCallback(lambda res: eventual.flushEventualQueue())
83         def _close_statsfile(res):
84             self.statsfile.close()
85         d.addCallback(_close_statsfile)
86         d.addCallback(lambda res: passthrough)
87         return d
88
89     def add_service(self, s):
90         s.setServiceParent(self.sparent)
91         return s
92
93     def make_introducer_and_vdrive(self):
94         iv_basedir = os.path.join(self.basedir, "introducer_and_vdrive")
95         os.mkdir(iv_basedir)
96         iv = introducer_and_vdrive.IntroducerAndVdrive(basedir=iv_basedir)
97         self.introducer_and_vdrive = self.add_service(iv)
98         d = self.introducer_and_vdrive.when_tub_ready()
99         return d
100
101     def make_nodes(self):
102         q = self.introducer_and_vdrive
103         self.introducer_furl = q.urls["introducer"]
104         self.vdrive_furl = q.urls["vdrive"]
105         self.nodes = []
106         for i in range(self.numnodes):
107             nodedir = os.path.join(self.basedir, "node%d" % i)
108             os.mkdir(nodedir)
109             f = open(os.path.join(nodedir, "introducer.furl"), "w")
110             f.write(self.introducer_furl)
111             f.close()
112             f = open(os.path.join(nodedir, "vdrive.furl"), "w")
113             f.write(self.vdrive_furl)
114             f.close()
115             if self.discard_shares:
116                 # for this test, we tell the storage servers to throw out all
117                 # their stored data, since we're only testing upload and not
118                 # download.
119                 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
120                 f.write("no_storage\n")
121                 f.close()
122             c = self.add_service(client.Client(basedir=nodedir))
123             self.nodes.append(c)
124         # the peers will start running, eventually they will connect to each
125         # other and the introducer_and_vdrive
126
127     def touch_keepalive(self):
128         f = open(self.keepalive_file, "w")
129         f.write("""\
130 If the node notices this file at startup, it will poll every 5 seconds and
131 terminate if the file is more than 10 seconds old, or if it has been deleted.
132 If the test harness has an internal failure and neglects to kill off the node
133 itself, this helps to avoid leaving processes lying around. The contents of
134 this file are ignored.
135         """)
136         f.close()
137
138     def start_client(self):
139         # this returns a Deferred that fires with the client's control.furl
140         log.msg("MAKING CLIENT")
141         clientdir = self.clientdir = os.path.join(self.basedir, "client")
142         quiet = StringIO()
143         create_node.create_client(clientdir, {}, out=quiet)
144         log.msg("DONE MAKING CLIENT")
145         f = open(os.path.join(clientdir, "introducer.furl"), "w")
146         f.write(self.introducer_furl + "\n")
147         f.close()
148         f = open(os.path.join(clientdir, "vdrive.furl"), "w")
149         f.write(self.vdrive_furl + "\n")
150         f.close()
151         if self.discard_shares:
152             f = open(os.path.join(clientdir, "debug_no_storage"), "w")
153             f.write("no_storage\n")
154             f.close()
155         self.keepalive_file = os.path.join(clientdir,
156                                            "suicide_prevention_hotline")
157         # now start updating the mtime.
158         self.touch_keepalive()
159         ts = internet.TimerService(4.0, self.touch_keepalive)
160         ts.setServiceParent(self.sparent)
161
162         pp = ClientWatcher()
163         self.proc_done = pp.d = defer.Deferred()
164         cmd = ["twistd", "-y", "client.tac"]
165         env = os.environ.copy()
166         self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
167         log.msg("CLIENT STARTED")
168
169         # now we wait for the client to get started. we're looking for the
170         # control.furl file to appear.
171         furl_file = os.path.join(clientdir, "control.furl")
172         def _check():
173             return os.path.exists(furl_file)
174         d = self.poll(_check, 0.1)
175         # once it exists, wait a moment before we read from it, just in case
176         # it hasn't finished writing the whole thing. Ideally control.furl
177         # would be created in some atomic fashion, or made non-readable until
178         # it's ready, but I can't think of an easy way to do that, and I
179         # think the chances that we'll observe a half-write are pretty low.
180         def _stall(res):
181             d2 = defer.Deferred()
182             reactor.callLater(0.1, d2.callback, None)
183             return d2
184         d.addCallback(_stall)
185         def _read(res):
186             f = open(furl_file, "r")
187             furl = f.read()
188             return furl.strip()
189         d.addCallback(_read)
190         return d
191
192
193     def kill_client(self):
194         # returns a Deferred that fires when the process exits. This may only
195         # be called once.
196         try:
197             self.proc.signalProcess("KILL")
198         except error.ProcessExitedAlready:
199             pass
200         return self.proc_done
201
202
203     def create_data(self, name, size):
204         filename = os.path.join(self.basedir, name + ".data")
205         f = open(filename, "wb")
206         block = "a" * 8192
207         while size > 0:
208             l = min(size, 8192)
209             f.write(block[:l])
210             size -= l
211         return filename
212
213     def stash_stats(self, stats, name):
214         self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
215         self.stats[name] = stats['VmPeak']
216
217     def do_test(self):
218         #print "CLIENT STARTED"
219         #print "FURL", self.control_furl
220         #print "RREF", self.control_rref
221         #print
222         kB = 1000; MB = 1000*1000
223         files = {}
224         uris = {}
225         control = self.control_rref
226
227         def _print_usage(res=None):
228             d = control.callRemote("get_memory_usage")
229             def _print(stats):
230                 print "VmSize: %9d  VmPeak: %9d" % (stats["VmSize"],
231                                                     stats["VmPeak"])
232                 return stats
233             d.addCallback(_print)
234             return d
235
236         def _do_upload(res, size):
237             name = '%d' % size
238             files[name] = self.create_data(name, size)
239             print
240             print "uploading %s" % name
241             d = control.callRemote("upload_from_file_to_uri", files[name])
242             def _done(uri):
243                 uris[name] = uri
244                 os.remove(files[name])
245                 del files[name]
246                 print "uploaded %s" % name
247             d.addCallback(_done)
248             return d
249
250         d = _print_usage()
251
252         for i in range(10):
253             d.addCallback(_do_upload, size=10*kB+i)
254             d.addCallback(_print_usage)
255         d.addCallback(self.stash_stats, "10kB")
256
257         for i in range(3):
258             d.addCallback(_do_upload, size=10*MB+i)
259             d.addCallback(_print_usage)
260         d.addCallback(self.stash_stats, "10MB")
261
262         #for i in range(1):
263         #    d.addCallback(_do_upload, size=50*MB+i)
264         #    d.addCallback(_print_usage)
265         #d.addCallback(self.stash_stats, "50MB")
266
267         #d.addCallback(self.stall)
268         def _done(res):
269             print "FINISHING"
270         d.addCallback(_done)
271         return d
272
273     def stall(self, res):
274         d = defer.Deferred()
275         reactor.callLater(5, d.callback, None)
276         return d
277
278
279 class ClientWatcher(protocol.ProcessProtocol):
280     def outReceived(self, data):
281         print "OUT:", data
282     def errReceived(self, data):
283         print "ERR:", data
284     def processEnded(self, reason):
285         self.d.callback(None)
286
287
288 if __name__ == '__main__':
289     mode = "upload"
290     if len(sys.argv) > 1:
291         mode = sys.argv[1]
292     sf = SystemFramework("_test_memory", mode)
293     sf.run()
294