]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_system.py
b1fc0be65d7e097b2bf486fb795523336427a7ba
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_system.py
1
2 import os
3 from twisted.trial import unittest
4 from twisted.internet import defer, reactor
5 from twisted.application import service
6 from allmydata import client, queen
7 from allmydata.util import idlib, fileutil
8 from foolscap.eventual import flushEventualQueue
9 from twisted.python import log
10 from twisted.web.client import getPage
11
12 def flush_but_dont_ignore(res):
13     d = flushEventualQueue()
14     def _done(ignored):
15         return res
16     d.addCallback(_done)
17     return d
18
19 class SystemTest(unittest.TestCase):
20
21     def setUp(self):
22         self.sparent = service.MultiService()
23         self.sparent.startService()
24     def tearDown(self):
25         log.msg("shutting down SystemTest services")
26         d = self.sparent.stopService()
27         d.addBoth(flush_but_dont_ignore)
28         return d
29
30     def getdir(self, subdir):
31         return os.path.join(self.basedir, subdir)
32
33     def add_service(self, s):
34         s.setServiceParent(self.sparent)
35         return s
36
37     def set_up_nodes(self, NUMCLIENTS=5):
38         self.numclients = NUMCLIENTS
39         queendir = self.getdir("queen")
40         if not os.path.isdir(queendir):
41             fileutil.make_dirs(queendir)
42         self.queen = self.add_service(queen.Queen(basedir=queendir))
43         d = self.queen.when_tub_ready()
44         d.addCallback(self._set_up_nodes_2)
45         return d
46
47     def _set_up_nodes_2(self, res):
48         q = self.queen
49         self.queen_furl = q.urls["introducer"]
50         self.vdrive_furl = q.urls["vdrive"]
51         self.clients = []
52         for i in range(self.numclients):
53             basedir = self.getdir("client%d" % i)
54             if not os.path.isdir(basedir):
55                 fileutil.make_dirs(basedir)
56             if i == 0:
57                 open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
58             open(os.path.join(basedir, "introducer.furl"), "w").write(self.queen_furl)
59             open(os.path.join(basedir, "vdrive.furl"), "w").write(self.vdrive_furl)
60             c = self.add_service(client.Client(basedir=basedir))
61             self.clients.append(c)
62         log.msg("STARTING")
63         d = self.wait_for_connections()
64         def _connected(res):
65             # now find out where the web port was
66             l = self.clients[0].getServiceNamed("webish").listener
67             port = l._port.getHost().port
68             self.webish_url = "http://localhost:%d/" % port
69         d.addCallback(_connected)
70         return d
71
72     def add_extra_node(self, client_num):
73         # this node is *not* parented to our self.sparent, so we can shut it
74         # down separately from the rest, to exercise the connection-lost code
75         basedir = self.getdir("client%d" % client_num)
76         if not os.path.isdir(basedir):
77             fileutil.make_dirs(basedir)
78         open(os.path.join(basedir, "introducer.furl"), "w").write(self.queen_furl)
79         open(os.path.join(basedir, "vdrive.furl"), "w").write(self.vdrive_furl)
80
81         c = client.Client(basedir=basedir)
82         self.clients.append(c)
83         self.numclients += 1
84         c.startService()
85         d = self.wait_for_connections()
86         d.addCallback(lambda res: c)
87         return d
88
89     def wait_for_connections(self, ignored=None):
90         for c in self.clients:
91             if (not c.introducer_client or
92                 len(list(c.get_all_peerids())) != self.numclients):
93                 d = defer.Deferred()
94                 d.addCallback(self.wait_for_connections)
95                 reactor.callLater(0.05, d.callback, None)
96                 return d
97         return defer.succeed(None)
98
99     def test_connections(self):
100         self.basedir = "test_system/SystemTest/test_connections"
101         d = self.set_up_nodes()
102         self.extra_node = None
103         d.addCallback(lambda res: self.add_extra_node(5))
104         def _check(extra_node):
105             self.extra_node = extra_node
106             for c in self.clients:
107                 self.failUnlessEqual(len(list(c.get_all_peerids())), 6)
108         d.addCallback(_check)
109         def _shutdown_extra_node(res):
110             if self.extra_node:
111                 return self.extra_node.stopService()
112             return res
113         d.addBoth(_shutdown_extra_node)
114         return d
115     test_connections.timeout = 300
116
117     def test_upload_and_download(self):
118         self.basedir = "test_system/SystemTest/test_upload_and_download"
119         # we use 4000 bytes of data, which will result in about 400k written
120         # to disk among all our simulated nodes
121         DATA = "Some data to upload\n" * 200
122         d = self.set_up_nodes()
123         def _do_upload(res):
124             log.msg("UPLOADING")
125             u = self.clients[0].getServiceNamed("uploader")
126             # we crank the max segsize down to 1024b for the duration of this
127             # test, so we can exercise multiple segments. It is important
128             # that this is not a multiple of the segment size, so that the
129             # tail segment is not the same length as the others.
130             options = {"max_segment_size": 1024}
131             d1 = u.upload_data(DATA, options)
132             return d1
133         d.addCallback(_do_upload)
134         def _upload_done(uri):
135             log.msg("upload finished: uri is %s" % (uri,))
136             self.uri = uri
137             dl = self.clients[1].getServiceNamed("downloader")
138             self.downloader = dl
139             d1 = dl.download_to_data(uri)
140             return d1
141         d.addCallback(_upload_done)
142         def _download_done(data):
143             log.msg("download finished")
144             self.failUnlessEqual(data, DATA)
145         d.addCallback(_download_done)
146
147         target_filename = os.path.join(self.basedir, "download.target")
148         def _download_to_filename(res):
149             return self.downloader.download_to_filename(self.uri,
150                                                         target_filename)
151         d.addCallback(_download_to_filename)
152         def _download_to_filename_done(res):
153             newdata = open(target_filename, "rb").read()
154             self.failUnlessEqual(newdata, DATA)
155         d.addCallback(_download_to_filename_done)
156
157         target_filename2 = os.path.join(self.basedir, "download.target2")
158         def _download_to_filehandle(res):
159             fh = open(target_filename2, "wb")
160             return self.downloader.download_to_filehandle(self.uri, fh)
161         d.addCallback(_download_to_filehandle)
162         def _download_to_filehandle_done(fh):
163             fh.close()
164             newdata = open(target_filename2, "rb").read()
165             self.failUnlessEqual(newdata, DATA)
166         d.addCallback(_download_to_filehandle_done)
167
168         return d
169     test_upload_and_download.timeout = 600
170
171     def test_vdrive(self):
172         self.basedir = "test_system/SystemTest/test_vdrive"
173         self.data = DATA = "Some data to publish to the virtual drive\n"
174         d = self.set_up_nodes()
175         def _do_publish(res):
176             log.msg("PUBLISHING")
177             v0 = self.clients[0].getServiceNamed("vdrive")
178             d1 = v0.make_directory("/", "subdir1")
179             d1.addCallback(lambda subdir1:
180                            v0.put_file_by_data(subdir1, "mydata567", DATA))
181             return d1
182         d.addCallback(_do_publish)
183         def _publish_done(res):
184             log.msg("publish finished")
185             v1 = self.clients[1].getServiceNamed("vdrive")
186             d1 = v1.get_file_to_data("/subdir1/mydata567")
187             return d1
188         d.addCallback(_publish_done)
189         def _get_done(data):
190             log.msg("get finished")
191             self.failUnlessEqual(data, DATA)
192         d.addCallback(_get_done)
193         d.addCallback(self._test_web)
194         return d
195     test_vdrive.timeout = 300
196
197     def _test_web(self, res):
198         base = self.webish_url
199         d = getPage(base)
200         def _got_welcome(page):
201             expected = "Connected Peers: <span>%d</span>" % (self.numclients)
202             self.failUnless(expected in page,
203                             "I didn't see the right 'connected peers' message "
204                             "in: %s" % page
205                             )
206             expected = "My nodeid: <span>%s</span>" % idlib.b2a(self.clients[0].nodeid)
207             self.failUnless(expected in page,
208                             "I didn't see the right 'My nodeid' message "
209                             "in: %s" % page)
210         d.addCallback(_got_welcome)
211         d.addCallback(lambda res: getPage(base + "vdrive/subdir1"))
212         def _got_subdir1(page):
213             # there ought to be an href for our file
214             self.failUnless(">mydata567</a>" in page)
215         d.addCallback(_got_subdir1)
216         d.addCallback(lambda res: getPage(base + "vdrive/subdir1/mydata567"))
217         def _got_data(page):
218             self.failUnlessEqual(page, self.data)
219         d.addCallback(_got_data)
220         return d
221