2 def foo(): pass # keep the line number constant
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import failure
9 from allmydata.util import bencode, idlib, humanreadable, mathutil, hashutil
10 from allmydata.util import assertutil, fileutil, testutil, deferredutil
13 class IDLib(unittest.TestCase):
15 self.failUnlessEqual(idlib.b2a("\x12\x34"), "ne4y")
16 def test_b2a_or_none(self):
17 self.failUnlessEqual(idlib.b2a_or_none(None), None)
18 self.failUnlessEqual(idlib.b2a_or_none("\x12\x34"), "ne4y")
20 self.failUnlessEqual(idlib.a2b("ne4y"), "\x12\x34")
21 self.failUnlessRaises(AssertionError, idlib.a2b, "b0gus")
22 def test_nodeid_b2a(self):
23 self.failUnlessEqual(idlib.nodeid_b2a("\x00"*20), "a"*32)
25 class NoArgumentException(Exception):
29 class HumanReadable(unittest.TestCase):
32 self.failUnlessEqual(hr(foo), "<foo() at test_util.py:2>")
33 self.failUnlessEqual(hr(self.test_repr),
34 "<bound method HumanReadable.test_repr of <allmydata.test.test_util.HumanReadable testMethod=test_repr>>")
35 self.failUnlessEqual(hr(1L), "1")
36 self.failUnlessEqual(hr(10**40),
37 "100000000000000000...000000000000000000")
38 self.failUnlessEqual(hr(self), "<allmydata.test.test_util.HumanReadable testMethod=test_repr>")
39 self.failUnlessEqual(hr([1,2]), "[1, 2]")
40 self.failUnlessEqual(hr({1:2}), "{1:2}")
45 hr(e) == "<RuntimeError: ()>" # python-2.4
46 or hr(e) == "RuntimeError()") # python-2.5
48 raise RuntimeError("oops")
51 hr(e) == "<RuntimeError: 'oops'>" # python-2.4
52 or hr(e) == "RuntimeError('oops',)") # python-2.5
54 raise NoArgumentException
57 hr(e) == "<NoArgumentException>" # python-2.4
58 or hr(e) == "NoArgumentException()") # python-2.5
64 class Bencode(unittest.TestCase):
65 def test_bencode(self):
67 self.failUnlessEqual(e(4), "i4e")
68 self.failUnlessEqual(e([1,2]), "li1ei2ee")
69 self.failUnlessEqual(e(MyList([1,2])), "li1ei2ee")
70 self.failUnlessEqual(e({1:2}), "di1ei2ee")
71 self.failUnlessEqual(e(u"a"), "u1:a")
72 self.failUnlessEqual(e([True,False]), "lb1b0e")
73 self.failUnlessEqual(e(1.5), "f1.5e")
74 self.failUnlessEqual(e("foo"), "3:foo")
76 self.failUnlessEqual(d("li1ei2ee"), [1,2])
77 self.failUnlessEqual(d("u1:a"), u"a")
78 self.failUnlessRaises(ValueError, d, "u10:short")
79 self.failUnlessEqual(d("lb1b0e"), [True,False])
80 self.failUnlessRaises(ValueError, d, "b2")
81 self.failUnlessEqual(d("f1.5e"), 1.5)
82 self.failUnlessEqual(d("3:foo"), "foo")
83 self.failUnlessRaises(ValueError, d,
84 "38:When doing layout, always plan ah")
85 # ooh! fascinating! bdecode requires string keys! I think this ought
87 #self.failUnlessEqual(d("di1ei2ee"), {1:2})
88 self.failUnlessEqual(d("d1:ai2eu1:bi3ee"), {"a":2, u"b":3})
89 self.failUnlessRaises(ValueError, d, "di1ei2ee")
90 self.failUnlessRaises(ValueError, d, "d1:ai1e1:ai2ee")
92 self.failUnlessRaises(ValueError, d, "i1ei2e")
94 # now run all the module's builtin tests
95 bencode.test_decode_raw_string()
96 bencode.test_encode_and_decode_unicode_results_in_unicode_type()
97 bencode.test_encode_and_decode_unicode_at_least_preserves_the_content_even_if_it_flattens_the_type()
98 bencode.test_dict_forbids_non_string_key()
99 bencode.test_dict_forbids_key_repeat()
100 bencode.test_empty_dict()
101 bencode.test_dict_allows_unicode_keys()
102 bencode.test_ValueError_in_decode_unknown()
103 bencode.test_encode_and_decode_none()
104 bencode.test_encode_and_decode_long()
105 bencode.test_encode_and_decode_int()
106 bencode.test_encode_and_decode_float()
107 bencode.test_encode_and_decode_bool()
108 #bencode.test_decode_noncanonical_int()
109 bencode.test_encode_and_decode_dict()
110 bencode.test_encode_and_decode_list()
111 bencode.test_encode_and_decode_tuple()
112 bencode.test_encode_and_decode_empty_dict()
113 bencode.test_encode_and_decode_complex_object()
114 bencode.test_unfinished_list()
115 bencode.test_unfinished_dict()
116 bencode.test_unsupported_type()
118 class Math(unittest.TestCase):
119 def test_div_ceil(self):
120 f = mathutil.div_ceil
121 self.failUnlessEqual(f(0, 1), 0)
122 self.failUnlessEqual(f(0, 2), 0)
123 self.failUnlessEqual(f(0, 3), 0)
124 self.failUnlessEqual(f(1, 3), 1)
125 self.failUnlessEqual(f(2, 3), 1)
126 self.failUnlessEqual(f(3, 3), 1)
127 self.failUnlessEqual(f(4, 3), 2)
128 self.failUnlessEqual(f(5, 3), 2)
129 self.failUnlessEqual(f(6, 3), 2)
130 self.failUnlessEqual(f(7, 3), 3)
132 def test_next_multiple(self):
133 f = mathutil.next_multiple
134 self.failUnlessEqual(f(5, 1), 5)
135 self.failUnlessEqual(f(5, 2), 6)
136 self.failUnlessEqual(f(5, 3), 6)
137 self.failUnlessEqual(f(5, 4), 8)
138 self.failUnlessEqual(f(5, 5), 5)
139 self.failUnlessEqual(f(5, 6), 6)
140 self.failUnlessEqual(f(32, 1), 32)
141 self.failUnlessEqual(f(32, 2), 32)
142 self.failUnlessEqual(f(32, 3), 33)
143 self.failUnlessEqual(f(32, 4), 32)
144 self.failUnlessEqual(f(32, 5), 35)
145 self.failUnlessEqual(f(32, 6), 36)
146 self.failUnlessEqual(f(32, 7), 35)
147 self.failUnlessEqual(f(32, 8), 32)
148 self.failUnlessEqual(f(32, 9), 36)
149 self.failUnlessEqual(f(32, 10), 40)
150 self.failUnlessEqual(f(32, 11), 33)
151 self.failUnlessEqual(f(32, 12), 36)
152 self.failUnlessEqual(f(32, 13), 39)
153 self.failUnlessEqual(f(32, 14), 42)
154 self.failUnlessEqual(f(32, 15), 45)
155 self.failUnlessEqual(f(32, 16), 32)
156 self.failUnlessEqual(f(32, 17), 34)
157 self.failUnlessEqual(f(32, 18), 36)
158 self.failUnlessEqual(f(32, 589), 589)
160 def test_pad_size(self):
161 f = mathutil.pad_size
162 self.failUnlessEqual(f(0, 4), 0)
163 self.failUnlessEqual(f(1, 4), 3)
164 self.failUnlessEqual(f(2, 4), 2)
165 self.failUnlessEqual(f(3, 4), 1)
166 self.failUnlessEqual(f(4, 4), 0)
167 self.failUnlessEqual(f(5, 4), 3)
169 def test_is_power_of_k(self):
170 f = mathutil.is_power_of_k
171 for i in range(1, 100):
172 if i in (1, 2, 4, 8, 16, 32, 64):
173 self.failUnless(f(i, 2), "but %d *is* a power of 2" % i)
175 self.failIf(f(i, 2), "but %d is *not* a power of 2" % i)
176 for i in range(1, 100):
177 if i in (1, 3, 9, 27, 81):
178 self.failUnless(f(i, 3), "but %d *is* a power of 3" % i)
180 self.failIf(f(i, 3), "but %d is *not* a power of 3" % i)
182 def test_next_power_of_k(self):
183 f = mathutil.next_power_of_k
184 self.failUnlessEqual(f(0,2), 1)
185 self.failUnlessEqual(f(1,2), 1)
186 self.failUnlessEqual(f(2,2), 2)
187 self.failUnlessEqual(f(3,2), 4)
188 self.failUnlessEqual(f(4,2), 4)
189 for i in range(5, 8): self.failUnlessEqual(f(i,2), 8, "%d" % i)
190 for i in range(9, 16): self.failUnlessEqual(f(i,2), 16, "%d" % i)
191 for i in range(17, 32): self.failUnlessEqual(f(i,2), 32, "%d" % i)
192 for i in range(33, 64): self.failUnlessEqual(f(i,2), 64, "%d" % i)
193 for i in range(65, 100): self.failUnlessEqual(f(i,2), 128, "%d" % i)
195 self.failUnlessEqual(f(0,3), 1)
196 self.failUnlessEqual(f(1,3), 1)
197 self.failUnlessEqual(f(2,3), 3)
198 self.failUnlessEqual(f(3,3), 3)
199 for i in range(4, 9): self.failUnlessEqual(f(i,3), 9, "%d" % i)
200 for i in range(10, 27): self.failUnlessEqual(f(i,3), 27, "%d" % i)
201 for i in range(28, 81): self.failUnlessEqual(f(i,3), 81, "%d" % i)
202 for i in range(82, 200): self.failUnlessEqual(f(i,3), 243, "%d" % i)
206 self.failUnlessEqual(f([1,2,3]), 2)
207 self.failUnlessEqual(f([0,0,0,4]), 1)
208 self.failUnlessAlmostEqual(f([0.0, 1.0, 1.0]), .666666666666)
211 class Asserts(unittest.TestCase):
212 def should_assert(self, func, *args, **kwargs):
214 func(*args, **kwargs)
215 except AssertionError, e:
218 self.fail("assert failed with non-AssertionError: %s" % e)
219 self.fail("assert was not caught")
221 def should_not_assert(self, func, *args, **kwargs):
223 regexp = kwargs["re"]
226 func(*args, **kwargs)
227 except AssertionError, e:
228 self.fail("assertion fired when it should not have: %s" % e)
230 self.fail("assertion (which shouldn't have failed) failed with non-AssertionError: %s" % e)
234 def test_assert(self):
235 f = assertutil._assert
236 self.should_assert(f)
237 self.should_assert(f, False)
238 self.should_not_assert(f, True)
240 m = self.should_assert(f, False, "message")
241 self.failUnlessEqual(m, "'message' <type 'str'>", m)
242 m = self.should_assert(f, False, "message1", othermsg=12)
243 self.failUnlessEqual("'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
244 m = self.should_assert(f, False, othermsg="message2")
245 self.failUnlessEqual("othermsg: 'message2' <type 'str'>", m)
247 def test_precondition(self):
248 f = assertutil.precondition
249 self.should_assert(f)
250 self.should_assert(f, False)
251 self.should_not_assert(f, True)
253 m = self.should_assert(f, False, "message")
254 self.failUnlessEqual("precondition: 'message' <type 'str'>", m)
255 m = self.should_assert(f, False, "message1", othermsg=12)
256 self.failUnlessEqual("precondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
257 m = self.should_assert(f, False, othermsg="message2")
258 self.failUnlessEqual("precondition: othermsg: 'message2' <type 'str'>", m)
260 def test_postcondition(self):
261 f = assertutil.postcondition
262 self.should_assert(f)
263 self.should_assert(f, False)
264 self.should_not_assert(f, True)
266 m = self.should_assert(f, False, "message")
267 self.failUnlessEqual("postcondition: 'message' <type 'str'>", m)
268 m = self.should_assert(f, False, "message1", othermsg=12)
269 self.failUnlessEqual("postcondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
270 m = self.should_assert(f, False, othermsg="message2")
271 self.failUnlessEqual("postcondition: othermsg: 'message2' <type 'str'>", m)
273 class FileUtil(unittest.TestCase):
274 def mkdir(self, basedir, path, mode=0777):
275 fn = os.path.join(basedir, path)
276 fileutil.make_dirs(fn, mode)
278 def touch(self, basedir, path, mode=None, data="touch\n"):
279 fn = os.path.join(basedir, path)
286 def test_rm_dir(self):
287 basedir = "util/FileUtil/test_rm_dir"
288 fileutil.make_dirs(basedir)
289 # create it again to test idempotency
290 fileutil.make_dirs(basedir)
291 d = os.path.join(basedir, "doomed")
293 self.touch(d, "a/b/1.txt")
294 self.touch(d, "a/b/2.txt", 0444)
295 self.touch(d, "a/b/3.txt", 0)
297 self.touch(d, "a/c/1.txt")
298 self.touch(d, "a/c/2.txt", 0444)
299 self.touch(d, "a/c/3.txt", 0)
300 os.chmod(os.path.join(d, "a/c"), 0444)
302 self.touch(d, "a/d/1.txt")
303 self.touch(d, "a/d/2.txt", 0444)
304 self.touch(d, "a/d/3.txt", 0)
305 os.chmod(os.path.join(d, "a/d"), 0)
308 self.failIf(os.path.exists(d))
309 # remove it again to test idempotency
312 def test_remove_if_possible(self):
313 basedir = "util/FileUtil/test_remove_if_possible"
314 fileutil.make_dirs(basedir)
315 self.touch(basedir, "here")
316 fn = os.path.join(basedir, "here")
317 fileutil.remove_if_possible(fn)
318 self.failIf(os.path.exists(fn))
319 fileutil.remove_if_possible(fn) # should be idempotent
320 fileutil.rm_dir(basedir)
321 fileutil.remove_if_possible(fn) # should survive errors
323 def test_open_or_create(self):
324 basedir = "util/FileUtil/test_open_or_create"
325 fileutil.make_dirs(basedir)
326 fn = os.path.join(basedir, "here")
327 f = fileutil.open_or_create(fn)
330 f = fileutil.open_or_create(fn)
337 self.failUnlessEqual(data, "stuff.more.")
339 def test_NamedTemporaryDirectory(self):
340 basedir = "util/FileUtil/test_NamedTemporaryDirectory"
341 fileutil.make_dirs(basedir)
342 td = fileutil.NamedTemporaryDirectory(dir=basedir)
344 self.failUnless(basedir in name)
345 self.failUnless(basedir in repr(td))
346 self.failUnless(os.path.isdir(name))
348 # it is conceivable that we need to force gc here, but I'm not sure
349 self.failIf(os.path.isdir(name))
351 def test_rename(self):
352 basedir = "util/FileUtil/test_rename"
353 fileutil.make_dirs(basedir)
354 self.touch(basedir, "here")
355 fn = os.path.join(basedir, "here")
356 fn2 = os.path.join(basedir, "there")
357 fileutil.rename(fn, fn2)
358 self.failIf(os.path.exists(fn))
359 self.failUnless(os.path.exists(fn2))
362 basedir = "util/FileUtil/test_du"
363 fileutil.make_dirs(basedir)
364 d = os.path.join(basedir, "space-consuming")
366 self.touch(d, "a/b/1.txt", data="a"*10)
367 self.touch(d, "a/b/2.txt", data="b"*11)
369 self.touch(d, "a/c/1.txt", data="c"*12)
370 self.touch(d, "a/c/2.txt", data="d"*13)
372 used = fileutil.du(basedir)
373 self.failUnlessEqual(10+11+12+13, used)
375 class PollMixinTests(unittest.TestCase):
377 self.pm = testutil.PollMixin()
379 def test_PollMixin_True(self):
380 d = self.pm.poll(check_f=lambda : True,
384 def test_PollMixin_False_then_True(self):
385 i = iter([False, True])
386 d = self.pm.poll(check_f=i.next,
390 def test_timeout(self):
391 d = self.pm.poll(check_f=lambda: False,
395 self.fail("poll should have failed, not returned %s" % (res,))
397 f.trap(testutil.TimeoutError)
398 return None # success
399 d.addCallbacks(_suc, _err)
402 class DeferredUtilTests(unittest.TestCase):
403 def test_success(self):
404 d1, d2 = defer.Deferred(), defer.Deferred()
407 dlss = deferredutil.DeferredListShouldSucceed([d1,d2])
408 dlss.addCallbacks(good.append, bad.append)
411 self.failUnlessEqual(good, [[1,2]])
412 self.failUnlessEqual(bad, [])
414 def test_failure(self):
415 d1, d2 = defer.Deferred(), defer.Deferred()
418 dlss = deferredutil.DeferredListShouldSucceed([d1,d2])
419 dlss.addCallbacks(good.append, bad.append)
420 d1.addErrback(lambda _ignore: None)
421 d2.addErrback(lambda _ignore: None)
423 d2.errback(RuntimeError())
424 self.failUnlessEqual(good, [])
425 self.failUnlessEqual(len(bad), 1)
427 self.failUnless(isinstance(f, failure.Failure))
428 self.failUnless(f.check(RuntimeError))
430 class HashUtilTests(unittest.TestCase):
431 def test_sha256d(self):
432 h1 = hashutil.tagged_hash_256d("tag1", "value")
433 h2 = hashutil.tagged_hasher_256d("tag1")
436 self.failUnlessEqual(h1, h2)
438 def test_sha256d_truncated(self):
439 h1 = hashutil.tagged_hash_256d("tag1", "value", 16)
440 h2 = hashutil.tagged_hasher_256d("tag1", 16)
443 self.failUnlessEqual(len(h1), 16)
444 self.failUnlessEqual(len(h2), 16)
445 self.failUnlessEqual(h1, h2)