4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from twisted.web.client import getPage
8 from allmydata import client, introducer_and_vdrive
9 from allmydata.scripts import create_node
10 from allmydata.util import testutil
12 from foolscap import eventual
13 from twisted.python import log
15 class SystemFramework(testutil.PollMixin):
18 def __init__(self, basedir, mode):
19 self.basedir = basedir = os.path.abspath(basedir)
20 if not basedir.startswith(os.path.abspath(".")):
21 raise AssertionError("safety issue: basedir must be a subdir")
22 if os.path.exists(basedir):
23 shutil.rmtree(basedir)
25 self.sparent = service.MultiService()
26 self.sparent.startService()
28 self.tub = foolscap.Tub()
29 self.tub.setServiceParent(self.sparent)
30 self.discard_shares = True
34 log.startLogging(open(os.path.join(self.basedir, "log"), "w"),
36 #logfile = open(os.path.join(self.basedir, "log"), "w")
37 #flo = log.FileLogObserver(logfile)
38 #log.startLoggingWithObserver(flo.emit, setStdout=False)
39 d = eventual.fireEventually()
40 d.addCallback(lambda res: self.setUp())
41 d.addCallback(lambda res: self.do_test())
42 d.addBoth(self.tearDown)
47 d.addBoth(lambda res: reactor.stop())
53 self.statsfile = open(os.path.join(self.basedir, "stats.out"), "w")
54 d = self.make_introducer_and_vdrive()
57 return self.start_client()
59 def _record_control_furl(control_furl):
60 self.control_furl = control_furl
61 #print "OBTAINING '%s'" % (control_furl,)
62 return self.tub.getReference(self.control_furl)
63 d.addCallback(_record_control_furl)
64 def _record_control(control_rref):
65 self.control_rref = control_rref
66 return control_rref.callRemote("wait_for_client_connections",
68 d.addCallback(_record_control)
75 def tearDown(self, passthrough):
76 # the client node will shut down in a few seconds
77 #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
78 log.msg("shutting down SystemTest services")
79 d = defer.succeed(None)
81 d.addCallback(lambda res: self.kill_client())
82 d.addCallback(lambda res: self.sparent.stopService())
83 d.addCallback(lambda res: eventual.flushEventualQueue())
84 def _close_statsfile(res):
85 self.statsfile.close()
86 d.addCallback(_close_statsfile)
87 d.addCallback(lambda res: passthrough)
90 def add_service(self, s):
91 s.setServiceParent(self.sparent)
94 def make_introducer_and_vdrive(self):
95 iv_basedir = os.path.join(self.basedir, "introducer_and_vdrive")
97 iv = introducer_and_vdrive.IntroducerAndVdrive(basedir=iv_basedir)
98 self.introducer_and_vdrive = self.add_service(iv)
99 d = self.introducer_and_vdrive.when_tub_ready()
102 def make_nodes(self):
103 q = self.introducer_and_vdrive
104 self.introducer_furl = q.urls["introducer"]
105 self.vdrive_furl = q.urls["vdrive"]
107 for i in range(self.numnodes):
108 nodedir = os.path.join(self.basedir, "node%d" % i)
110 f = open(os.path.join(nodedir, "introducer.furl"), "w")
111 f.write(self.introducer_furl)
113 f = open(os.path.join(nodedir, "vdrive.furl"), "w")
114 f.write(self.vdrive_furl)
116 if self.discard_shares:
117 # for this test, we tell the storage servers to throw out all
118 # their stored data, since we're only testing upload and not
120 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
121 f.write("no_storage\n")
123 c = self.add_service(client.Client(basedir=nodedir))
125 # the peers will start running, eventually they will connect to each
126 # other and the introducer_and_vdrive
128 def touch_keepalive(self):
129 f = open(self.keepalive_file, "w")
131 If the node notices this file at startup, it will poll every 5 seconds and
132 terminate if the file is more than 10 seconds old, or if it has been deleted.
133 If the test harness has an internal failure and neglects to kill off the node
134 itself, this helps to avoid leaving processes lying around. The contents of
135 this file are ignored.
139 def start_client(self):
140 # this returns a Deferred that fires with the client's control.furl
141 log.msg("MAKING CLIENT")
142 clientdir = self.clientdir = os.path.join(self.basedir, "client")
144 create_node.create_client(clientdir, {}, out=quiet)
145 log.msg("DONE MAKING CLIENT")
146 f = open(os.path.join(clientdir, "introducer.furl"), "w")
147 f.write(self.introducer_furl + "\n")
149 f = open(os.path.join(clientdir, "vdrive.furl"), "w")
150 f.write(self.vdrive_furl + "\n")
152 f = open(os.path.join(clientdir, "webport"), "w")
153 # TODO: ideally we would set webport=0 and then ask the node what
154 # port it picked. But at the moment it is not convenient to do this,
155 # so we just pick a relatively unique one.
156 webport = max(os.getpid(), 2000)
157 f.write("tcp:%d:interface=127.0.0.1\n" % webport)
159 self.webish_url = "http://localhost:%d" % webport
160 if self.discard_shares:
161 f = open(os.path.join(clientdir, "debug_no_storage"), "w")
162 f.write("no_storage\n")
164 self.keepalive_file = os.path.join(clientdir,
165 "suicide_prevention_hotline")
166 # now start updating the mtime.
167 self.touch_keepalive()
168 ts = internet.TimerService(4.0, self.touch_keepalive)
169 ts.setServiceParent(self.sparent)
172 self.proc_done = pp.d = defer.Deferred()
173 cmd = ["twistd", "-y", "client.tac"]
174 env = os.environ.copy()
175 self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
176 log.msg("CLIENT STARTED")
178 # now we wait for the client to get started. we're looking for the
179 # control.furl file to appear.
180 furl_file = os.path.join(clientdir, "control.furl")
182 return os.path.exists(furl_file)
183 d = self.poll(_check, 0.1)
184 # once it exists, wait a moment before we read from it, just in case
185 # it hasn't finished writing the whole thing. Ideally control.furl
186 # would be created in some atomic fashion, or made non-readable until
187 # it's ready, but I can't think of an easy way to do that, and I
188 # think the chances that we'll observe a half-write are pretty low.
190 d2 = defer.Deferred()
191 reactor.callLater(0.1, d2.callback, None)
193 d.addCallback(_stall)
195 f = open(furl_file, "r")
202 def kill_client(self):
203 # returns a Deferred that fires when the process exits. This may only
206 self.proc.signalProcess("KILL")
207 except error.ProcessExitedAlready:
209 return self.proc_done
212 def create_data(self, name, size):
213 filename = os.path.join(self.basedir, name + ".data")
214 f = open(filename, "wb")
222 def stash_stats(self, stats, name):
223 self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
224 self.stats[name] = stats['VmPeak']
226 def POST(self, urlpath, **fields):
227 url = self.webish_url + urlpath
228 sepbase = "boogabooga"
232 form.append('Content-Disposition: form-data; name="_charset"')
236 for name, value in fields.iteritems():
237 if isinstance(value, tuple):
238 filename, value = value
239 form.append('Content-Disposition: form-data; name="%s"; '
240 'filename="%s"' % (name, filename))
242 form.append('Content-Disposition: form-data; name="%s"' % name)
247 body = "\r\n".join(form) + "\r\n"
248 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
250 return getPage(url, method="POST", postdata=body,
251 headers=headers, followRedirect=False)
254 #print "CLIENT STARTED"
255 #print "FURL", self.control_furl
256 #print "RREF", self.control_rref
258 kB = 1000; MB = 1000*1000
261 control = self.control_rref
263 def _print_usage(res=None):
264 d = control.callRemote("get_memory_usage")
266 print "VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
269 d.addCallback(_print)
272 def _do_upload(res, size):
275 print "uploading %s" % name
276 if self.mode == "upload":
277 files[name] = self.create_data(name, size)
278 d = control.callRemote("upload_from_file_to_uri", files[name])
280 os.remove(files[name])
284 elif self.mode == "upload-POST":
286 url = "/vdrive/global"
287 d = self.POST(url, t="upload", file=("%d.data" % size, data))
289 raise RuntimeError("unknown mode=%s" % self.mode)
292 print "uploaded %s" % name
293 d.addCallback(_complete)
297 d.addCallback(self.stash_stats, "0B")
300 d.addCallback(_do_upload, size=10*kB+i)
301 d.addCallback(_print_usage)
302 d.addCallback(self.stash_stats, "10kB")
305 d.addCallback(_do_upload, size=10*MB+i)
306 d.addCallback(_print_usage)
307 d.addCallback(self.stash_stats, "10MB")
310 d.addCallback(_do_upload, size=50*MB+i)
311 d.addCallback(_print_usage)
312 d.addCallback(self.stash_stats, "50MB")
315 d.addCallback(_do_upload, size=100*MB+i)
316 d.addCallback(_print_usage)
317 d.addCallback(self.stash_stats, "100MB")
319 #d.addCallback(self.stall)
325 def stall(self, res):
327 reactor.callLater(5, d.callback, None)
331 class ClientWatcher(protocol.ProcessProtocol):
332 def outReceived(self, data):
334 def errReceived(self, data):
336 def processEnded(self, reason):
337 self.d.callback(None)
340 if __name__ == '__main__':
342 if len(sys.argv) > 1:
344 sf = SystemFramework("_test_memory", mode)