4 from cStringIO import StringIO
5 from twisted.internet import defer, reactor, protocol, error
6 from twisted.application import service, internet
7 from allmydata import client, introducer_and_vdrive
8 from allmydata.scripts import create_node
9 from allmydata.util import testutil
11 from foolscap import eventual
12 from twisted.python import log
14 class SystemFramework(testutil.PollMixin):
17 def __init__(self, basedir, mode):
18 self.basedir = basedir = os.path.abspath(basedir)
19 if not basedir.startswith(os.path.abspath(".")):
20 raise AssertionError("safety issue: basedir must be a subdir")
21 if os.path.exists(basedir):
22 shutil.rmtree(basedir)
24 self.sparent = service.MultiService()
25 self.sparent.startService()
27 self.tub = foolscap.Tub()
28 self.tub.setServiceParent(self.sparent)
29 self.discard_shares = True
33 log.startLogging(open(os.path.join(self.basedir, "log"), "w"),
35 #logfile = open(os.path.join(self.basedir, "log"), "w")
36 #flo = log.FileLogObserver(logfile)
37 #log.startLoggingWithObserver(flo.emit, setStdout=False)
38 d = eventual.fireEventually()
39 d.addCallback(lambda res: self.setUp())
40 d.addCallback(lambda res: self.do_test())
41 d.addBoth(self.tearDown)
46 d.addBoth(lambda res: reactor.stop())
52 self.statsfile = open(os.path.join(self.basedir, "stats.out"), "w")
53 d = self.make_introducer_and_vdrive()
56 return self.start_client()
58 def _record_control_furl(control_furl):
59 self.control_furl = control_furl
60 #print "OBTAINING '%s'" % (control_furl,)
61 return self.tub.getReference(self.control_furl)
62 d.addCallback(_record_control_furl)
63 def _record_control(control_rref):
64 self.control_rref = control_rref
65 return control_rref.callRemote("wait_for_client_connections",
67 d.addCallback(_record_control)
74 def tearDown(self, passthrough):
75 # the client node will shut down in a few seconds
76 #os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
77 log.msg("shutting down SystemTest services")
78 d = defer.succeed(None)
80 d.addCallback(lambda res: self.kill_client())
81 d.addCallback(lambda res: self.sparent.stopService())
82 d.addCallback(lambda res: eventual.flushEventualQueue())
83 def _close_statsfile(res):
84 self.statsfile.close()
85 d.addCallback(_close_statsfile)
86 d.addCallback(lambda res: passthrough)
89 def add_service(self, s):
90 s.setServiceParent(self.sparent)
93 def make_introducer_and_vdrive(self):
94 iv_basedir = os.path.join(self.basedir, "introducer_and_vdrive")
96 iv = introducer_and_vdrive.IntroducerAndVdrive(basedir=iv_basedir)
97 self.introducer_and_vdrive = self.add_service(iv)
98 d = self.introducer_and_vdrive.when_tub_ready()
101 def make_nodes(self):
102 q = self.introducer_and_vdrive
103 self.introducer_furl = q.urls["introducer"]
104 self.vdrive_furl = q.urls["vdrive"]
106 for i in range(self.numnodes):
107 nodedir = os.path.join(self.basedir, "node%d" % i)
109 f = open(os.path.join(nodedir, "introducer.furl"), "w")
110 f.write(self.introducer_furl)
112 f = open(os.path.join(nodedir, "vdrive.furl"), "w")
113 f.write(self.vdrive_furl)
115 if self.discard_shares:
116 # for this test, we tell the storage servers to throw out all
117 # their stored data, since we're only testing upload and not
119 f = open(os.path.join(nodedir, "debug_no_storage"), "w")
120 f.write("no_storage\n")
122 c = self.add_service(client.Client(basedir=nodedir))
124 # the peers will start running, eventually they will connect to each
125 # other and the introducer_and_vdrive
127 def touch_keepalive(self):
128 f = open(self.keepalive_file, "w")
130 If the node notices this file at startup, it will poll every 5 seconds and
131 terminate if the file is more than 10 seconds old, or if it has been deleted.
132 If the test harness has an internal failure and neglects to kill off the node
133 itself, this helps to avoid leaving processes lying around. The contents of
134 this file are ignored.
138 def start_client(self):
139 # this returns a Deferred that fires with the client's control.furl
140 log.msg("MAKING CLIENT")
141 clientdir = self.clientdir = os.path.join(self.basedir, "client")
143 create_node.create_client(clientdir, {}, out=quiet)
144 log.msg("DONE MAKING CLIENT")
145 f = open(os.path.join(clientdir, "introducer.furl"), "w")
146 f.write(self.introducer_furl + "\n")
148 f = open(os.path.join(clientdir, "vdrive.furl"), "w")
149 f.write(self.vdrive_furl + "\n")
151 if self.discard_shares:
152 f = open(os.path.join(clientdir, "debug_no_storage"), "w")
153 f.write("no_storage\n")
155 self.keepalive_file = os.path.join(clientdir,
156 "suicide_prevention_hotline")
157 # now start updating the mtime.
158 self.touch_keepalive()
159 ts = internet.TimerService(4.0, self.touch_keepalive)
160 ts.setServiceParent(self.sparent)
163 self.proc_done = pp.d = defer.Deferred()
164 cmd = ["twistd", "-y", "client.tac"]
165 env = os.environ.copy()
166 self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
167 log.msg("CLIENT STARTED")
169 # now we wait for the client to get started. we're looking for the
170 # control.furl file to appear.
171 furl_file = os.path.join(clientdir, "control.furl")
173 return os.path.exists(furl_file)
174 d = self.poll(_check, 0.1)
175 # once it exists, wait a moment before we read from it, just in case
176 # it hasn't finished writing the whole thing. Ideally control.furl
177 # would be created in some atomic fashion, or made non-readable until
178 # it's ready, but I can't think of an easy way to do that, and I
179 # think the chances that we'll observe a half-write are pretty low.
181 d2 = defer.Deferred()
182 reactor.callLater(0.1, d2.callback, None)
184 d.addCallback(_stall)
186 f = open(furl_file, "r")
193 def kill_client(self):
194 # returns a Deferred that fires when the process exits. This may only
197 self.proc.signalProcess("KILL")
198 except error.ProcessExitedAlready:
200 return self.proc_done
203 def create_data(self, name, size):
204 filename = os.path.join(self.basedir, name + ".data")
205 f = open(filename, "wb")
213 def stash_stats(self, stats, name):
214 self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
215 self.stats[name] = stats['VmPeak']
218 #print "CLIENT STARTED"
219 #print "FURL", self.control_furl
220 #print "RREF", self.control_rref
222 kB = 1000; MB = 1000*1000
225 control = self.control_rref
227 def _print_usage(res=None):
228 d = control.callRemote("get_memory_usage")
230 print "VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
233 d.addCallback(_print)
236 def _do_upload(res, size):
238 files[name] = self.create_data(name, size)
240 print "uploading %s" % name
241 d = control.callRemote("upload_from_file_to_uri", files[name])
244 os.remove(files[name])
246 print "uploaded %s" % name
253 d.addCallback(_do_upload, size=10*kB+i)
254 d.addCallback(_print_usage)
255 d.addCallback(self.stash_stats, "10kB")
258 d.addCallback(_do_upload, size=10*MB+i)
259 d.addCallback(_print_usage)
260 d.addCallback(self.stash_stats, "10MB")
263 # d.addCallback(_do_upload, size=50*MB+i)
264 # d.addCallback(_print_usage)
265 #d.addCallback(self.stash_stats, "50MB")
267 #d.addCallback(self.stall)
273 def stall(self, res):
275 reactor.callLater(5, d.callback, None)
279 class ClientWatcher(protocol.ProcessProtocol):
280 def outReceived(self, data):
282 def errReceived(self, data):
284 def processEnded(self, reason):
285 self.d.callback(None)
288 if __name__ == '__main__':
290 if len(sys.argv) > 1:
292 sf = SystemFramework("_test_memory", mode)