2 def foo(): pass # keep the line number constant
5 from twisted.trial import unittest
7 from allmydata.util import bencode, idlib, humanreadable, mathutil
8 from allmydata.util import assertutil, fileutil, testutil
11 class IDLib(unittest.TestCase):
13 self.failUnlessEqual(idlib.b2a("\x12\x34"), "ne4y")
14 def test_b2a_or_none(self):
15 self.failUnlessEqual(idlib.b2a_or_none(None), None)
16 self.failUnlessEqual(idlib.b2a_or_none("\x12\x34"), "ne4y")
18 self.failUnlessEqual(idlib.a2b("ne4y"), "\x12\x34")
19 self.failUnlessRaises(AssertionError, idlib.a2b, "b0gus")
20 def test_nodeid_b2a(self):
21 self.failUnlessEqual(idlib.nodeid_b2a("\x00"*20), "a"*32)
23 class NoArgumentException(Exception):
27 class HumanReadable(unittest.TestCase):
30 self.failUnlessEqual(hr(foo), "<foo() at test_util.py:2>")
31 self.failUnlessEqual(hr(self.test_repr),
32 "<bound method HumanReadable.test_repr of <allmydata.test.test_util.HumanReadable testMethod=test_repr>>")
33 self.failUnlessEqual(hr(1L), "1")
34 self.failUnlessEqual(hr(10**40),
35 "100000000000000000...000000000000000000")
36 self.failUnlessEqual(hr(self), "<allmydata.test.test_util.HumanReadable testMethod=test_repr>")
37 self.failUnlessEqual(hr([1,2]), "[1, 2]")
38 self.failUnlessEqual(hr({1:2}), "{1:2}")
43 hr(e) == "<RuntimeError: ()>" # python-2.4
44 or hr(e) == "RuntimeError()") # python-2.5
46 raise RuntimeError("oops")
49 hr(e) == "<RuntimeError: 'oops'>" # python-2.4
50 or hr(e) == "RuntimeError('oops',)") # python-2.5
52 raise NoArgumentException
55 hr(e) == "<NoArgumentException>" # python-2.4
56 or hr(e) == "NoArgumentException()") # python-2.5
62 class Bencode(unittest.TestCase):
63 def test_bencode(self):
65 self.failUnlessEqual(e(4), "i4e")
66 self.failUnlessEqual(e([1,2]), "li1ei2ee")
67 self.failUnlessEqual(e(MyList([1,2])), "li1ei2ee")
68 self.failUnlessEqual(e({1:2}), "di1ei2ee")
69 self.failUnlessEqual(e(u"a"), "u1:a")
70 self.failUnlessEqual(e([True,False]), "lb1b0e")
71 self.failUnlessEqual(e(1.5), "f1.5e")
72 self.failUnlessEqual(e("foo"), "3:foo")
74 self.failUnlessEqual(d("li1ei2ee"), [1,2])
75 self.failUnlessEqual(d("u1:a"), u"a")
76 self.failUnlessRaises(ValueError, d, "u10:short")
77 self.failUnlessEqual(d("lb1b0e"), [True,False])
78 self.failUnlessRaises(ValueError, d, "b2")
79 self.failUnlessEqual(d("f1.5e"), 1.5)
80 self.failUnlessEqual(d("3:foo"), "foo")
81 self.failUnlessRaises(ValueError, d,
82 "38:When doing layout, always plan ah")
83 # ooh! fascinating! bdecode requires string keys! I think this ought
85 #self.failUnlessEqual(d("di1ei2ee"), {1:2})
86 self.failUnlessEqual(d("d1:ai2eu1:bi3ee"), {"a":2, u"b":3})
87 self.failUnlessRaises(ValueError, d, "di1ei2ee")
88 self.failUnlessRaises(ValueError, d, "d1:ai1e1:ai2ee")
90 self.failUnlessRaises(ValueError, d, "i1ei2e")
92 # now run all the module's builtin tests
93 bencode.test_decode_raw_string()
94 bencode.test_encode_and_decode_unicode_results_in_unicode_type()
95 bencode.test_encode_and_decode_unicode_at_least_preserves_the_content_even_if_it_flattens_the_type()
96 bencode.test_dict_forbids_non_string_key()
97 bencode.test_dict_forbids_key_repeat()
98 bencode.test_empty_dict()
99 bencode.test_dict_allows_unicode_keys()
100 bencode.test_ValueError_in_decode_unknown()
101 bencode.test_encode_and_decode_none()
102 bencode.test_encode_and_decode_long()
103 bencode.test_encode_and_decode_int()
104 bencode.test_encode_and_decode_float()
105 bencode.test_encode_and_decode_bool()
106 #bencode.test_decode_noncanonical_int()
107 bencode.test_encode_and_decode_dict()
108 bencode.test_encode_and_decode_list()
109 bencode.test_encode_and_decode_tuple()
110 bencode.test_encode_and_decode_empty_dict()
111 bencode.test_encode_and_decode_complex_object()
112 bencode.test_unfinished_list()
113 bencode.test_unfinished_dict()
114 bencode.test_unsupported_type()
116 class Math(unittest.TestCase):
117 def test_div_ceil(self):
118 f = mathutil.div_ceil
119 self.failUnlessEqual(f(0, 1), 0)
120 self.failUnlessEqual(f(0, 2), 0)
121 self.failUnlessEqual(f(0, 3), 0)
122 self.failUnlessEqual(f(1, 3), 1)
123 self.failUnlessEqual(f(2, 3), 1)
124 self.failUnlessEqual(f(3, 3), 1)
125 self.failUnlessEqual(f(4, 3), 2)
126 self.failUnlessEqual(f(5, 3), 2)
127 self.failUnlessEqual(f(6, 3), 2)
128 self.failUnlessEqual(f(7, 3), 3)
130 def test_next_multiple(self):
131 f = mathutil.next_multiple
132 self.failUnlessEqual(f(5, 1), 5)
133 self.failUnlessEqual(f(5, 2), 6)
134 self.failUnlessEqual(f(5, 3), 6)
135 self.failUnlessEqual(f(5, 4), 8)
136 self.failUnlessEqual(f(5, 5), 5)
137 self.failUnlessEqual(f(5, 6), 6)
138 self.failUnlessEqual(f(32, 1), 32)
139 self.failUnlessEqual(f(32, 2), 32)
140 self.failUnlessEqual(f(32, 3), 33)
141 self.failUnlessEqual(f(32, 4), 32)
142 self.failUnlessEqual(f(32, 5), 35)
143 self.failUnlessEqual(f(32, 6), 36)
144 self.failUnlessEqual(f(32, 7), 35)
145 self.failUnlessEqual(f(32, 8), 32)
146 self.failUnlessEqual(f(32, 9), 36)
147 self.failUnlessEqual(f(32, 10), 40)
148 self.failUnlessEqual(f(32, 11), 33)
149 self.failUnlessEqual(f(32, 12), 36)
150 self.failUnlessEqual(f(32, 13), 39)
151 self.failUnlessEqual(f(32, 14), 42)
152 self.failUnlessEqual(f(32, 15), 45)
153 self.failUnlessEqual(f(32, 16), 32)
154 self.failUnlessEqual(f(32, 17), 34)
155 self.failUnlessEqual(f(32, 18), 36)
156 self.failUnlessEqual(f(32, 589), 589)
158 def test_pad_size(self):
159 f = mathutil.pad_size
160 self.failUnlessEqual(f(0, 4), 0)
161 self.failUnlessEqual(f(1, 4), 3)
162 self.failUnlessEqual(f(2, 4), 2)
163 self.failUnlessEqual(f(3, 4), 1)
164 self.failUnlessEqual(f(4, 4), 0)
165 self.failUnlessEqual(f(5, 4), 3)
167 def test_is_power_of_k(self):
168 f = mathutil.is_power_of_k
169 for i in range(1, 100):
170 if i in (1, 2, 4, 8, 16, 32, 64):
171 self.failUnless(f(i, 2), "but %d *is* a power of 2" % i)
173 self.failIf(f(i, 2), "but %d is *not* a power of 2" % i)
174 for i in range(1, 100):
175 if i in (1, 3, 9, 27, 81):
176 self.failUnless(f(i, 3), "but %d *is* a power of 3" % i)
178 self.failIf(f(i, 3), "but %d is *not* a power of 3" % i)
180 def test_next_power_of_k(self):
181 f = mathutil.next_power_of_k
182 self.failUnlessEqual(f(0,2), 1)
183 self.failUnlessEqual(f(1,2), 1)
184 self.failUnlessEqual(f(2,2), 2)
185 self.failUnlessEqual(f(3,2), 4)
186 self.failUnlessEqual(f(4,2), 4)
187 for i in range(5, 8): self.failUnlessEqual(f(i,2), 8, "%d" % i)
188 for i in range(9, 16): self.failUnlessEqual(f(i,2), 16, "%d" % i)
189 for i in range(17, 32): self.failUnlessEqual(f(i,2), 32, "%d" % i)
190 for i in range(33, 64): self.failUnlessEqual(f(i,2), 64, "%d" % i)
191 for i in range(65, 100): self.failUnlessEqual(f(i,2), 128, "%d" % i)
193 self.failUnlessEqual(f(0,3), 1)
194 self.failUnlessEqual(f(1,3), 1)
195 self.failUnlessEqual(f(2,3), 3)
196 self.failUnlessEqual(f(3,3), 3)
197 for i in range(4, 9): self.failUnlessEqual(f(i,3), 9, "%d" % i)
198 for i in range(10, 27): self.failUnlessEqual(f(i,3), 27, "%d" % i)
199 for i in range(28, 81): self.failUnlessEqual(f(i,3), 81, "%d" % i)
200 for i in range(82, 200): self.failUnlessEqual(f(i,3), 243, "%d" % i)
204 self.failUnlessEqual(f([1,2,3]), 2)
205 self.failUnlessEqual(f([0,0,0,4]), 1)
206 self.failUnlessAlmostEqual(f([0.0, 1.0, 1.0]), .666666666666)
209 class Asserts(unittest.TestCase):
210 def should_assert(self, func, *args, **kwargs):
212 func(*args, **kwargs)
213 except AssertionError, e:
216 self.fail("assert failed with non-AssertionError: %s" % e)
217 self.fail("assert was not caught")
219 def should_not_assert(self, func, *args, **kwargs):
221 regexp = kwargs["re"]
224 func(*args, **kwargs)
225 except AssertionError, e:
226 self.fail("assertion fired when it should not have: %s" % e)
228 self.fail("assertion (which shouldn't have failed) failed with non-AssertionError: %s" % e)
232 def test_assert(self):
233 f = assertutil._assert
234 self.should_assert(f)
235 self.should_assert(f, False)
236 self.should_not_assert(f, True)
238 m = self.should_assert(f, False, "message")
239 self.failUnlessEqual(m, "'message' <type 'str'>", m)
240 m = self.should_assert(f, False, "message1", othermsg=12)
241 self.failUnlessEqual("'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
242 m = self.should_assert(f, False, othermsg="message2")
243 self.failUnlessEqual("othermsg: 'message2' <type 'str'>", m)
245 def test_precondition(self):
246 f = assertutil.precondition
247 self.should_assert(f)
248 self.should_assert(f, False)
249 self.should_not_assert(f, True)
251 m = self.should_assert(f, False, "message")
252 self.failUnlessEqual("precondition: 'message' <type 'str'>", m)
253 m = self.should_assert(f, False, "message1", othermsg=12)
254 self.failUnlessEqual("precondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
255 m = self.should_assert(f, False, othermsg="message2")
256 self.failUnlessEqual("precondition: othermsg: 'message2' <type 'str'>", m)
258 def test_postcondition(self):
259 f = assertutil.postcondition
260 self.should_assert(f)
261 self.should_assert(f, False)
262 self.should_not_assert(f, True)
264 m = self.should_assert(f, False, "message")
265 self.failUnlessEqual("postcondition: 'message' <type 'str'>", m)
266 m = self.should_assert(f, False, "message1", othermsg=12)
267 self.failUnlessEqual("postcondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
268 m = self.should_assert(f, False, othermsg="message2")
269 self.failUnlessEqual("postcondition: othermsg: 'message2' <type 'str'>", m)
271 class FileUtil(unittest.TestCase):
272 def mkdir(self, basedir, path, mode=0777):
273 fn = os.path.join(basedir, path)
274 fileutil.make_dirs(fn, mode)
276 def touch(self, basedir, path, mode=None, data="touch\n"):
277 fn = os.path.join(basedir, path)
284 def test_rm_dir(self):
285 basedir = "util/FileUtil/test_rm_dir"
286 fileutil.make_dirs(basedir)
287 # create it again to test idempotency
288 fileutil.make_dirs(basedir)
289 d = os.path.join(basedir, "doomed")
291 self.touch(d, "a/b/1.txt")
292 self.touch(d, "a/b/2.txt", 0444)
293 self.touch(d, "a/b/3.txt", 0)
295 self.touch(d, "a/c/1.txt")
296 self.touch(d, "a/c/2.txt", 0444)
297 self.touch(d, "a/c/3.txt", 0)
298 os.chmod(os.path.join(d, "a/c"), 0444)
300 self.touch(d, "a/d/1.txt")
301 self.touch(d, "a/d/2.txt", 0444)
302 self.touch(d, "a/d/3.txt", 0)
303 os.chmod(os.path.join(d, "a/d"), 0)
306 self.failIf(os.path.exists(d))
307 # remove it again to test idempotency
310 def test_remove_if_possible(self):
311 basedir = "util/FileUtil/test_remove_if_possible"
312 fileutil.make_dirs(basedir)
313 self.touch(basedir, "here")
314 fn = os.path.join(basedir, "here")
315 fileutil.remove_if_possible(fn)
316 self.failIf(os.path.exists(fn))
317 fileutil.remove_if_possible(fn) # should be idempotent
318 fileutil.rm_dir(basedir)
319 fileutil.remove_if_possible(fn) # should survive errors
321 def test_open_or_create(self):
322 basedir = "util/FileUtil/test_open_or_create"
323 fileutil.make_dirs(basedir)
324 fn = os.path.join(basedir, "here")
325 f = fileutil.open_or_create(fn)
328 f = fileutil.open_or_create(fn)
335 self.failUnlessEqual(data, "stuff.more.")
337 def test_NamedTemporaryDirectory(self):
338 basedir = "util/FileUtil/test_NamedTemporaryDirectory"
339 fileutil.make_dirs(basedir)
340 td = fileutil.NamedTemporaryDirectory(dir=basedir)
342 self.failUnless(basedir in name)
343 self.failUnless(basedir in repr(td))
344 self.failUnless(os.path.isdir(name))
346 # it is conceivable that we need to force gc here, but I'm not sure
347 self.failIf(os.path.isdir(name))
349 def test_rename(self):
350 basedir = "util/FileUtil/test_rename"
351 fileutil.make_dirs(basedir)
352 self.touch(basedir, "here")
353 fn = os.path.join(basedir, "here")
354 fn2 = os.path.join(basedir, "there")
355 fileutil.rename(fn, fn2)
356 self.failIf(os.path.exists(fn))
357 self.failUnless(os.path.exists(fn2))
360 basedir = "util/FileUtil/test_du"
361 fileutil.make_dirs(basedir)
362 d = os.path.join(basedir, "space-consuming")
364 self.touch(d, "a/b/1.txt", data="a"*10)
365 self.touch(d, "a/b/2.txt", data="b"*11)
367 self.touch(d, "a/c/1.txt", data="c"*12)
368 self.touch(d, "a/c/2.txt", data="d"*13)
370 used = fileutil.du(basedir)
371 self.failUnlessEqual(10+11+12+13, used)
373 class PollMixinTests(unittest.TestCase):
375 self.pm = testutil.PollMixin()
378 def fail_unless_arg_is_true(arg):
379 self.failUnless(arg is True, repr(arg))
380 d.addCallback(fail_unless_arg_is_true)
383 def test_PollMixin_True(self):
384 d = self.pm.poll(check_f=lambda : True,
386 return self._check(d)
388 def test_PollMixin_False_then_True(self):
389 i = iter([False, True])
390 d = self.pm.poll(check_f=i.next,
392 return self._check(d)