3 from twisted.trial import unittest
4 from twisted.internet import defer, reactor
5 from twisted.application import service
6 from allmydata import client, queen
7 from allmydata.util import idlib, fileutil
8 from foolscap.eventual import flushEventualQueue
9 from twisted.python import log
10 from twisted.web.client import getPage
12 def flush_but_dont_ignore(res):
13 d = flushEventualQueue()
19 class SystemTest(unittest.TestCase):
22 self.sparent = service.MultiService()
23 self.sparent.startService()
25 log.msg("shutting down SystemTest services")
26 d = self.sparent.stopService()
27 d.addBoth(flush_but_dont_ignore)
30 def getdir(self, subdir):
31 return os.path.join(self.basedir, subdir)
33 def add_service(self, s):
34 s.setServiceParent(self.sparent)
37 def set_up_nodes(self, NUMCLIENTS=5):
38 self.numclients = NUMCLIENTS
39 queendir = self.getdir("queen")
40 if not os.path.isdir(queendir):
41 fileutil.make_dirs(queendir)
42 self.queen = self.add_service(queen.Queen(basedir=queendir))
43 d = self.queen.when_tub_ready()
44 d.addCallback(self._set_up_nodes_2)
47 def _set_up_nodes_2(self, res):
49 self.queen_furl = q.urls["introducer"]
50 self.vdrive_furl = q.urls["vdrive"]
52 for i in range(self.numclients):
53 basedir = self.getdir("client%d" % i)
54 if not os.path.isdir(basedir):
55 fileutil.make_dirs(basedir)
57 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
58 open(os.path.join(basedir, "introducer.furl"), "w").write(self.queen_furl)
59 open(os.path.join(basedir, "vdrive.furl"), "w").write(self.vdrive_furl)
60 c = self.add_service(client.Client(basedir=basedir))
61 self.clients.append(c)
63 d = self.wait_for_connections()
65 # now find out where the web port was
66 l = self.clients[0].getServiceNamed("webish").listener
67 port = l._port.getHost().port
68 self.webish_url = "http://localhost:%d/" % port
69 d.addCallback(_connected)
72 def add_extra_node(self, client_num):
73 # this node is *not* parented to our self.sparent, so we can shut it
74 # down separately from the rest, to exercise the connection-lost code
75 basedir = self.getdir("client%d" % client_num)
76 if not os.path.isdir(basedir):
77 fileutil.make_dirs(basedir)
78 open(os.path.join(basedir, "introducer.furl"), "w").write(self.queen_furl)
79 open(os.path.join(basedir, "vdrive.furl"), "w").write(self.vdrive_furl)
81 c = client.Client(basedir=basedir)
82 self.clients.append(c)
85 d = self.wait_for_connections()
86 d.addCallback(lambda res: c)
89 def wait_for_connections(self, ignored=None):
90 for c in self.clients:
91 if (not c.introducer_client or
92 len(list(c.get_all_peerids())) != self.numclients):
94 d.addCallback(self.wait_for_connections)
95 reactor.callLater(0.05, d.callback, None)
97 return defer.succeed(None)
99 def test_connections(self):
100 self.basedir = "test_system/SystemTest/test_connections"
101 d = self.set_up_nodes()
102 self.extra_node = None
103 d.addCallback(lambda res: self.add_extra_node(5))
104 def _check(extra_node):
105 self.extra_node = extra_node
106 for c in self.clients:
107 self.failUnlessEqual(len(list(c.get_all_peerids())), 6)
108 d.addCallback(_check)
109 def _shutdown_extra_node(res):
111 return self.extra_node.stopService()
113 d.addBoth(_shutdown_extra_node)
115 test_connections.timeout = 300
117 def test_upload_and_download(self):
118 self.basedir = "test_system/SystemTest/test_upload_and_download"
119 # we use 4000 bytes of data, which will result in about 400k written
120 # to disk among all our simulated nodes
121 DATA = "Some data to upload\n" * 200
122 d = self.set_up_nodes()
125 u = self.clients[0].getServiceNamed("uploader")
126 # we crank the max segsize down to 1024b for the duration of this
127 # test, so we can exercise multiple segments. It is important
128 # that this is not a multiple of the segment size, so that the
129 # tail segment is not the same length as the others.
130 options = {"max_segment_size": 1024}
131 d1 = u.upload_data(DATA, options)
133 d.addCallback(_do_upload)
134 def _upload_done(uri):
135 log.msg("upload finished: uri is %s" % (uri,))
137 dl = self.clients[1].getServiceNamed("downloader")
139 d1 = dl.download_to_data(uri)
141 d.addCallback(_upload_done)
142 def _download_done(data):
143 log.msg("download finished")
144 self.failUnlessEqual(data, DATA)
145 d.addCallback(_download_done)
147 target_filename = os.path.join(self.basedir, "download.target")
148 def _download_to_filename(res):
149 return self.downloader.download_to_filename(self.uri,
151 d.addCallback(_download_to_filename)
152 def _download_to_filename_done(res):
153 newdata = open(target_filename, "rb").read()
154 self.failUnlessEqual(newdata, DATA)
155 d.addCallback(_download_to_filename_done)
157 target_filename2 = os.path.join(self.basedir, "download.target2")
158 def _download_to_filehandle(res):
159 fh = open(target_filename2, "wb")
160 return self.downloader.download_to_filehandle(self.uri, fh)
161 d.addCallback(_download_to_filehandle)
162 def _download_to_filehandle_done(fh):
164 newdata = open(target_filename2, "rb").read()
165 self.failUnlessEqual(newdata, DATA)
166 d.addCallback(_download_to_filehandle_done)
169 test_upload_and_download.timeout = 600
171 def test_vdrive(self):
172 self.basedir = "test_system/SystemTest/test_vdrive"
173 self.data = DATA = "Some data to publish to the virtual drive\n"
174 d = self.set_up_nodes()
175 def _do_publish(res):
176 log.msg("PUBLISHING")
177 v0 = self.clients[0].getServiceNamed("vdrive")
178 d1 = v0.make_directory("/", "subdir1")
179 d1.addCallback(lambda subdir1:
180 v0.put_file_by_data(subdir1, "mydata567", DATA))
182 d.addCallback(_do_publish)
183 def _publish_done(res):
184 log.msg("publish finished")
185 v1 = self.clients[1].getServiceNamed("vdrive")
186 d1 = v1.get_file_to_data("/subdir1/mydata567")
188 d.addCallback(_publish_done)
190 log.msg("get finished")
191 self.failUnlessEqual(data, DATA)
192 d.addCallback(_get_done)
193 d.addCallback(self._test_web)
195 test_vdrive.timeout = 300
197 def _test_web(self, res):
198 base = self.webish_url
200 def _got_welcome(page):
201 expected = "Connected Peers: <span>%d</span>" % (self.numclients)
202 self.failUnless(expected in page,
203 "I didn't see the right 'connected peers' message "
206 expected = "My nodeid: <span>%s</span>" % idlib.b2a(self.clients[0].nodeid)
207 self.failUnless(expected in page,
208 "I didn't see the right 'My nodeid' message "
210 d.addCallback(_got_welcome)
211 d.addCallback(lambda res: getPage(base + "vdrive/subdir1"))
212 def _got_subdir1(page):
213 # there ought to be an href for our file
214 self.failUnless(">mydata567</a>" in page)
215 d.addCallback(_got_subdir1)
216 d.addCallback(lambda res: getPage(base + "vdrive/subdir1/mydata567"))
218 self.failUnlessEqual(page, self.data)
219 d.addCallback(_got_data)