2 import re, errno, subprocess, os
4 from twisted.trial import unittest
6 from allmydata.util import iputil
7 import allmydata.test.common_util as testutil
13 DOTTED_QUAD_RE=re.compile("^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$")
15 MOCK_IPADDR_OUTPUT = """\
16 1: lo: <LOOPBACK,UP,LOWER_UP> mtu 16436 qdisc noqueue state UNKNOWN \n\
17 link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
18 inet 127.0.0.1/8 scope host lo
19 inet6 ::1/128 scope host \n\
20 valid_lft forever preferred_lft forever
21 2: eth1: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP qlen 1000
22 link/ether d4:3d:7e:01:b4:3e brd ff:ff:ff:ff:ff:ff
23 inet 192.168.0.6/24 brd 192.168.0.255 scope global eth1
24 inet6 fe80::d63d:7eff:fe01:b43e/64 scope link \n\
25 valid_lft forever preferred_lft forever
26 3: wlan0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq state UP qlen 1000
27 link/ether 90:f6:52:27:15:0a brd ff:ff:ff:ff:ff:ff
28 inet 192.168.0.2/24 brd 192.168.0.255 scope global wlan0
29 inet6 fe80::92f6:52ff:fe27:150a/64 scope link \n\
30 valid_lft forever preferred_lft forever
33 MOCK_IFCONFIG_OUTPUT = """\
34 eth1 Link encap:Ethernet HWaddr d4:3d:7e:01:b4:3e \n\
35 inet addr:192.168.0.6 Bcast:192.168.0.255 Mask:255.255.255.0
36 inet6 addr: fe80::d63d:7eff:fe01:b43e/64 Scope:Link
37 UP BROADCAST RUNNING MULTICAST MTU:1500 Metric:1
38 RX packets:154242234 errors:0 dropped:0 overruns:0 frame:0
39 TX packets:155461891 errors:0 dropped:0 overruns:0 carrier:0
40 collisions:0 txqueuelen:1000 \n\
41 RX bytes:84367213640 (78.5 GiB) TX bytes:73401695329 (68.3 GiB)
42 Interrupt:20 Memory:f4f00000-f4f20000 \n\
44 lo Link encap:Local Loopback \n\
45 inet addr:127.0.0.1 Mask:255.0.0.0
46 inet6 addr: ::1/128 Scope:Host
47 UP LOOPBACK RUNNING MTU:16436 Metric:1
48 RX packets:27449267 errors:0 dropped:0 overruns:0 frame:0
49 TX packets:27449267 errors:0 dropped:0 overruns:0 carrier:0
50 collisions:0 txqueuelen:0 \n\
51 RX bytes:192643017823 (179.4 GiB) TX bytes:192643017823 (179.4 GiB)
53 wlan0 Link encap:Ethernet HWaddr 90:f6:52:27:15:0a \n\
54 inet addr:192.168.0.2 Bcast:192.168.0.255 Mask:255.255.255.0
55 inet6 addr: fe80::92f6:52ff:fe27:150a/64 Scope:Link
56 UP BROADCAST RUNNING MULTICAST MTU:1500 Metric:1
57 RX packets:12352750 errors:0 dropped:0 overruns:0 frame:0
58 TX packets:4501451 errors:0 dropped:0 overruns:0 carrier:0
59 collisions:0 txqueuelen:1000 \n\
60 RX bytes:3916475942 (3.6 GiB) TX bytes:458353654 (437.1 MiB)
63 # This is actually from a VirtualBox VM running XP.
64 MOCK_ROUTE_OUTPUT = """\
65 ===========================================================================
67 0x1 ........................... MS TCP Loopback interface
68 0x2 ...08 00 27 c3 80 ad ...... AMD PCNET Family PCI Ethernet Adapter - Packet Scheduler Miniport
69 ===========================================================================
70 ===========================================================================
72 Network Destination Netmask Gateway Interface Metric
73 0.0.0.0 0.0.0.0 10.0.2.2 10.0.2.15 20
74 10.0.2.0 255.255.255.0 10.0.2.15 10.0.2.15 20
75 10.0.2.15 255.255.255.255 127.0.0.1 127.0.0.1 20
76 10.255.255.255 255.255.255.255 10.0.2.15 10.0.2.15 20
77 127.0.0.0 255.0.0.0 127.0.0.1 127.0.0.1 1
78 224.0.0.0 240.0.0.0 10.0.2.15 10.0.2.15 20
79 255.255.255.255 255.255.255.255 10.0.2.15 10.0.2.15 1
80 Default Gateway: 10.0.2.2
81 ===========================================================================
86 UNIX_TEST_ADDRESSES = set(["127.0.0.1", "192.168.0.6", "192.168.0.2", "192.168.0.10"])
87 WINDOWS_TEST_ADDRESSES = set(["127.0.0.1", "10.0.2.15", "192.168.0.10"])
88 CYGWIN_TEST_ADDRESSES = set(["127.0.0.1", "192.168.0.10"])
92 def __init__(self, output, err):
95 def communicate(self):
96 return (self.output, self.err)
99 class ListAddresses(testutil.SignalMixin, unittest.TestCase):
100 def test_get_local_ip_for(self):
101 addr = iputil.get_local_ip_for('127.0.0.1')
102 self.failUnless(DOTTED_QUAD_RE.match(addr))
104 def test_list_async(self):
105 d = iputil.get_local_addresses_async()
106 def _check(addresses):
107 self.failUnlessIn("127.0.0.1", addresses)
108 self.failIfIn("0.0.0.0", addresses)
109 d.addCallbacks(_check)
111 # David A.'s OpenSolaris box timed out on this test one time when it was at 2s.
112 test_list_async.timeout=4
114 def _test_list_async_mock(self, command, output, expected):
118 def call_Popen(args, bufsize=0, executable=None, stdin=None, stdout=None, stderr=None,
119 preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None,
120 universal_newlines=False, startupinfo=None, creationflags=0):
124 e.errno = errno.EINTR
126 elif os.path.basename(args[0]) == command:
127 return FakeProcess(output, "")
129 e = OSError("[Errno 2] No such file or directory")
130 e.errno = errno.ENOENT
132 self.patch(subprocess, 'Popen', call_Popen)
133 self.patch(os.path, 'isfile', lambda x: True)
135 def call_get_local_ip_for(target):
136 if target in ("localhost", "127.0.0.1"):
139 return "192.168.0.10"
140 self.patch(iputil, 'get_local_ip_for', call_get_local_ip_for)
142 def call_which(name):
144 self.patch(iputil, 'which', call_which)
146 d = iputil.get_local_addresses_async()
147 def _check(addresses):
148 self.failUnlessEquals(set(addresses), set(expected))
149 d.addCallbacks(_check)
152 def test_list_async_mock_ip_addr(self):
153 self.patch(iputil, 'platform', "linux2")
154 return self._test_list_async_mock("ip", MOCK_IPADDR_OUTPUT, UNIX_TEST_ADDRESSES)
156 def test_list_async_mock_ifconfig(self):
157 self.patch(iputil, 'platform', "linux2")
158 return self._test_list_async_mock("ifconfig", MOCK_IFCONFIG_OUTPUT, UNIX_TEST_ADDRESSES)
160 def test_list_async_mock_route(self):
161 self.patch(iputil, 'platform', "win32")
162 return self._test_list_async_mock("route.exe", MOCK_ROUTE_OUTPUT, WINDOWS_TEST_ADDRESSES)
164 def test_list_async_mock_cygwin(self):
165 self.patch(iputil, 'platform', "cygwin")
166 return self._test_list_async_mock(None, None, CYGWIN_TEST_ADDRESSES)