2 def foo(): pass # keep the line number constant
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import failure
9 from allmydata.util import base32, bencode, idlib, humanreadable, mathutil, hashutil
10 from allmydata.util import assertutil, fileutil, testutil, deferredutil
13 class Base32(unittest.TestCase):
15 self.failUnlessEqual(base32.b2a("\x12\x34"), "ci2a")
16 def test_b2a_or_none(self):
17 self.failUnlessEqual(base32.b2a_or_none(None), None)
18 self.failUnlessEqual(base32.b2a_or_none("\x12\x34"), "ci2a")
20 self.failUnlessEqual(base32.a2b("ci2a"), "\x12\x34")
21 self.failUnlessRaises(AssertionError, base32.a2b, "b0gus")
23 class IDLib(unittest.TestCase):
24 def test_nodeid_b2a(self):
25 self.failUnlessEqual(idlib.nodeid_b2a("\x00"*20), "a"*32)
27 class NoArgumentException(Exception):
31 class HumanReadable(unittest.TestCase):
34 self.failUnlessEqual(hr(foo), "<foo() at test_util.py:2>")
35 self.failUnlessEqual(hr(self.test_repr),
36 "<bound method HumanReadable.test_repr of <allmydata.test.test_util.HumanReadable testMethod=test_repr>>")
37 self.failUnlessEqual(hr(1L), "1")
38 self.failUnlessEqual(hr(10**40),
39 "100000000000000000...000000000000000000")
40 self.failUnlessEqual(hr(self), "<allmydata.test.test_util.HumanReadable testMethod=test_repr>")
41 self.failUnlessEqual(hr([1,2]), "[1, 2]")
42 self.failUnlessEqual(hr({1:2}), "{1:2}")
47 hr(e) == "<RuntimeError: ()>" # python-2.4
48 or hr(e) == "RuntimeError()") # python-2.5
50 raise RuntimeError("oops")
53 hr(e) == "<RuntimeError: 'oops'>" # python-2.4
54 or hr(e) == "RuntimeError('oops',)") # python-2.5
56 raise NoArgumentException
59 hr(e) == "<NoArgumentException>" # python-2.4
60 or hr(e) == "NoArgumentException()") # python-2.5
66 class Bencode(unittest.TestCase):
67 def test_bencode(self):
69 self.failUnlessEqual(e(4), "i4e")
70 self.failUnlessEqual(e([1,2]), "li1ei2ee")
71 self.failUnlessEqual(e(MyList([1,2])), "li1ei2ee")
72 self.failUnlessEqual(e({1:2}), "di1ei2ee")
73 self.failUnlessEqual(e(u"a"), "u1:a")
74 self.failUnlessEqual(e([True,False]), "lb1b0e")
75 self.failUnlessEqual(e(1.5), "f1.5e")
76 self.failUnlessEqual(e("foo"), "3:foo")
78 self.failUnlessEqual(d("li1ei2ee"), [1,2])
79 self.failUnlessEqual(d("u1:a"), u"a")
80 self.failUnlessRaises(ValueError, d, "u10:short")
81 self.failUnlessEqual(d("lb1b0e"), [True,False])
82 self.failUnlessRaises(ValueError, d, "b2")
83 self.failUnlessEqual(d("f1.5e"), 1.5)
84 self.failUnlessEqual(d("3:foo"), "foo")
85 self.failUnlessRaises(ValueError, d,
86 "38:When doing layout, always plan ah")
87 # ooh! fascinating! bdecode requires string keys! I think this ought
89 #self.failUnlessEqual(d("di1ei2ee"), {1:2})
90 self.failUnlessEqual(d("d1:ai2eu1:bi3ee"), {"a":2, u"b":3})
91 self.failUnlessRaises(ValueError, d, "di1ei2ee")
92 self.failUnlessRaises(ValueError, d, "d1:ai1e1:ai2ee")
94 self.failUnlessRaises(ValueError, d, "i1ei2e")
96 # now run all the module's builtin tests
97 bencode.test_decode_raw_string()
98 bencode.test_encode_and_decode_unicode_results_in_unicode_type()
99 bencode.test_encode_and_decode_unicode_at_least_preserves_the_content_even_if_it_flattens_the_type()
100 bencode.test_dict_forbids_non_string_key()
101 bencode.test_dict_forbids_key_repeat()
102 bencode.test_empty_dict()
103 bencode.test_dict_allows_unicode_keys()
104 bencode.test_ValueError_in_decode_unknown()
105 bencode.test_encode_and_decode_none()
106 bencode.test_encode_and_decode_long()
107 bencode.test_encode_and_decode_int()
108 bencode.test_encode_and_decode_float()
109 bencode.test_encode_and_decode_bool()
110 #bencode.test_decode_noncanonical_int()
111 bencode.test_encode_and_decode_dict()
112 bencode.test_encode_and_decode_list()
113 bencode.test_encode_and_decode_tuple()
114 bencode.test_encode_and_decode_empty_dict()
115 bencode.test_encode_and_decode_complex_object()
116 bencode.test_unfinished_list()
117 bencode.test_unfinished_dict()
118 bencode.test_unsupported_type()
120 class Math(unittest.TestCase):
121 def test_div_ceil(self):
122 f = mathutil.div_ceil
123 self.failUnlessEqual(f(0, 1), 0)
124 self.failUnlessEqual(f(0, 2), 0)
125 self.failUnlessEqual(f(0, 3), 0)
126 self.failUnlessEqual(f(1, 3), 1)
127 self.failUnlessEqual(f(2, 3), 1)
128 self.failUnlessEqual(f(3, 3), 1)
129 self.failUnlessEqual(f(4, 3), 2)
130 self.failUnlessEqual(f(5, 3), 2)
131 self.failUnlessEqual(f(6, 3), 2)
132 self.failUnlessEqual(f(7, 3), 3)
134 def test_next_multiple(self):
135 f = mathutil.next_multiple
136 self.failUnlessEqual(f(5, 1), 5)
137 self.failUnlessEqual(f(5, 2), 6)
138 self.failUnlessEqual(f(5, 3), 6)
139 self.failUnlessEqual(f(5, 4), 8)
140 self.failUnlessEqual(f(5, 5), 5)
141 self.failUnlessEqual(f(5, 6), 6)
142 self.failUnlessEqual(f(32, 1), 32)
143 self.failUnlessEqual(f(32, 2), 32)
144 self.failUnlessEqual(f(32, 3), 33)
145 self.failUnlessEqual(f(32, 4), 32)
146 self.failUnlessEqual(f(32, 5), 35)
147 self.failUnlessEqual(f(32, 6), 36)
148 self.failUnlessEqual(f(32, 7), 35)
149 self.failUnlessEqual(f(32, 8), 32)
150 self.failUnlessEqual(f(32, 9), 36)
151 self.failUnlessEqual(f(32, 10), 40)
152 self.failUnlessEqual(f(32, 11), 33)
153 self.failUnlessEqual(f(32, 12), 36)
154 self.failUnlessEqual(f(32, 13), 39)
155 self.failUnlessEqual(f(32, 14), 42)
156 self.failUnlessEqual(f(32, 15), 45)
157 self.failUnlessEqual(f(32, 16), 32)
158 self.failUnlessEqual(f(32, 17), 34)
159 self.failUnlessEqual(f(32, 18), 36)
160 self.failUnlessEqual(f(32, 589), 589)
162 def test_pad_size(self):
163 f = mathutil.pad_size
164 self.failUnlessEqual(f(0, 4), 0)
165 self.failUnlessEqual(f(1, 4), 3)
166 self.failUnlessEqual(f(2, 4), 2)
167 self.failUnlessEqual(f(3, 4), 1)
168 self.failUnlessEqual(f(4, 4), 0)
169 self.failUnlessEqual(f(5, 4), 3)
171 def test_is_power_of_k(self):
172 f = mathutil.is_power_of_k
173 for i in range(1, 100):
174 if i in (1, 2, 4, 8, 16, 32, 64):
175 self.failUnless(f(i, 2), "but %d *is* a power of 2" % i)
177 self.failIf(f(i, 2), "but %d is *not* a power of 2" % i)
178 for i in range(1, 100):
179 if i in (1, 3, 9, 27, 81):
180 self.failUnless(f(i, 3), "but %d *is* a power of 3" % i)
182 self.failIf(f(i, 3), "but %d is *not* a power of 3" % i)
184 def test_next_power_of_k(self):
185 f = mathutil.next_power_of_k
186 self.failUnlessEqual(f(0,2), 1)
187 self.failUnlessEqual(f(1,2), 1)
188 self.failUnlessEqual(f(2,2), 2)
189 self.failUnlessEqual(f(3,2), 4)
190 self.failUnlessEqual(f(4,2), 4)
191 for i in range(5, 8): self.failUnlessEqual(f(i,2), 8, "%d" % i)
192 for i in range(9, 16): self.failUnlessEqual(f(i,2), 16, "%d" % i)
193 for i in range(17, 32): self.failUnlessEqual(f(i,2), 32, "%d" % i)
194 for i in range(33, 64): self.failUnlessEqual(f(i,2), 64, "%d" % i)
195 for i in range(65, 100): self.failUnlessEqual(f(i,2), 128, "%d" % i)
197 self.failUnlessEqual(f(0,3), 1)
198 self.failUnlessEqual(f(1,3), 1)
199 self.failUnlessEqual(f(2,3), 3)
200 self.failUnlessEqual(f(3,3), 3)
201 for i in range(4, 9): self.failUnlessEqual(f(i,3), 9, "%d" % i)
202 for i in range(10, 27): self.failUnlessEqual(f(i,3), 27, "%d" % i)
203 for i in range(28, 81): self.failUnlessEqual(f(i,3), 81, "%d" % i)
204 for i in range(82, 200): self.failUnlessEqual(f(i,3), 243, "%d" % i)
208 self.failUnlessEqual(f([1,2,3]), 2)
209 self.failUnlessEqual(f([0,0,0,4]), 1)
210 self.failUnlessAlmostEqual(f([0.0, 1.0, 1.0]), .666666666666)
213 class Asserts(unittest.TestCase):
214 def should_assert(self, func, *args, **kwargs):
216 func(*args, **kwargs)
217 except AssertionError, e:
220 self.fail("assert failed with non-AssertionError: %s" % e)
221 self.fail("assert was not caught")
223 def should_not_assert(self, func, *args, **kwargs):
225 regexp = kwargs["re"]
228 func(*args, **kwargs)
229 except AssertionError, e:
230 self.fail("assertion fired when it should not have: %s" % e)
232 self.fail("assertion (which shouldn't have failed) failed with non-AssertionError: %s" % e)
236 def test_assert(self):
237 f = assertutil._assert
238 self.should_assert(f)
239 self.should_assert(f, False)
240 self.should_not_assert(f, True)
242 m = self.should_assert(f, False, "message")
243 self.failUnlessEqual(m, "'message' <type 'str'>", m)
244 m = self.should_assert(f, False, "message1", othermsg=12)
245 self.failUnlessEqual("'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
246 m = self.should_assert(f, False, othermsg="message2")
247 self.failUnlessEqual("othermsg: 'message2' <type 'str'>", m)
249 def test_precondition(self):
250 f = assertutil.precondition
251 self.should_assert(f)
252 self.should_assert(f, False)
253 self.should_not_assert(f, True)
255 m = self.should_assert(f, False, "message")
256 self.failUnlessEqual("precondition: 'message' <type 'str'>", m)
257 m = self.should_assert(f, False, "message1", othermsg=12)
258 self.failUnlessEqual("precondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
259 m = self.should_assert(f, False, othermsg="message2")
260 self.failUnlessEqual("precondition: othermsg: 'message2' <type 'str'>", m)
262 def test_postcondition(self):
263 f = assertutil.postcondition
264 self.should_assert(f)
265 self.should_assert(f, False)
266 self.should_not_assert(f, True)
268 m = self.should_assert(f, False, "message")
269 self.failUnlessEqual("postcondition: 'message' <type 'str'>", m)
270 m = self.should_assert(f, False, "message1", othermsg=12)
271 self.failUnlessEqual("postcondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
272 m = self.should_assert(f, False, othermsg="message2")
273 self.failUnlessEqual("postcondition: othermsg: 'message2' <type 'str'>", m)
275 class FileUtil(unittest.TestCase):
276 def mkdir(self, basedir, path, mode=0777):
277 fn = os.path.join(basedir, path)
278 fileutil.make_dirs(fn, mode)
280 def touch(self, basedir, path, mode=None, data="touch\n"):
281 fn = os.path.join(basedir, path)
288 def test_rm_dir(self):
289 basedir = "util/FileUtil/test_rm_dir"
290 fileutil.make_dirs(basedir)
291 # create it again to test idempotency
292 fileutil.make_dirs(basedir)
293 d = os.path.join(basedir, "doomed")
295 self.touch(d, "a/b/1.txt")
296 self.touch(d, "a/b/2.txt", 0444)
297 self.touch(d, "a/b/3.txt", 0)
299 self.touch(d, "a/c/1.txt")
300 self.touch(d, "a/c/2.txt", 0444)
301 self.touch(d, "a/c/3.txt", 0)
302 os.chmod(os.path.join(d, "a/c"), 0444)
304 self.touch(d, "a/d/1.txt")
305 self.touch(d, "a/d/2.txt", 0444)
306 self.touch(d, "a/d/3.txt", 0)
307 os.chmod(os.path.join(d, "a/d"), 0)
310 self.failIf(os.path.exists(d))
311 # remove it again to test idempotency
314 def test_remove_if_possible(self):
315 basedir = "util/FileUtil/test_remove_if_possible"
316 fileutil.make_dirs(basedir)
317 self.touch(basedir, "here")
318 fn = os.path.join(basedir, "here")
319 fileutil.remove_if_possible(fn)
320 self.failIf(os.path.exists(fn))
321 fileutil.remove_if_possible(fn) # should be idempotent
322 fileutil.rm_dir(basedir)
323 fileutil.remove_if_possible(fn) # should survive errors
325 def test_open_or_create(self):
326 basedir = "util/FileUtil/test_open_or_create"
327 fileutil.make_dirs(basedir)
328 fn = os.path.join(basedir, "here")
329 f = fileutil.open_or_create(fn)
332 f = fileutil.open_or_create(fn)
339 self.failUnlessEqual(data, "stuff.more.")
341 def test_NamedTemporaryDirectory(self):
342 basedir = "util/FileUtil/test_NamedTemporaryDirectory"
343 fileutil.make_dirs(basedir)
344 td = fileutil.NamedTemporaryDirectory(dir=basedir)
346 self.failUnless(basedir in name)
347 self.failUnless(basedir in repr(td))
348 self.failUnless(os.path.isdir(name))
350 # it is conceivable that we need to force gc here, but I'm not sure
351 self.failIf(os.path.isdir(name))
353 def test_rename(self):
354 basedir = "util/FileUtil/test_rename"
355 fileutil.make_dirs(basedir)
356 self.touch(basedir, "here")
357 fn = os.path.join(basedir, "here")
358 fn2 = os.path.join(basedir, "there")
359 fileutil.rename(fn, fn2)
360 self.failIf(os.path.exists(fn))
361 self.failUnless(os.path.exists(fn2))
364 basedir = "util/FileUtil/test_du"
365 fileutil.make_dirs(basedir)
366 d = os.path.join(basedir, "space-consuming")
368 self.touch(d, "a/b/1.txt", data="a"*10)
369 self.touch(d, "a/b/2.txt", data="b"*11)
371 self.touch(d, "a/c/1.txt", data="c"*12)
372 self.touch(d, "a/c/2.txt", data="d"*13)
374 used = fileutil.du(basedir)
375 self.failUnlessEqual(10+11+12+13, used)
377 class PollMixinTests(unittest.TestCase):
379 self.pm = testutil.PollMixin()
381 def test_PollMixin_True(self):
382 d = self.pm.poll(check_f=lambda : True,
386 def test_PollMixin_False_then_True(self):
387 i = iter([False, True])
388 d = self.pm.poll(check_f=i.next,
392 def test_timeout(self):
393 d = self.pm.poll(check_f=lambda: False,
397 self.fail("poll should have failed, not returned %s" % (res,))
399 f.trap(testutil.TimeoutError)
400 return None # success
401 d.addCallbacks(_suc, _err)
404 class DeferredUtilTests(unittest.TestCase):
405 def test_success(self):
406 d1, d2 = defer.Deferred(), defer.Deferred()
409 dlss = deferredutil.DeferredListShouldSucceed([d1,d2])
410 dlss.addCallbacks(good.append, bad.append)
413 self.failUnlessEqual(good, [[1,2]])
414 self.failUnlessEqual(bad, [])
416 def test_failure(self):
417 d1, d2 = defer.Deferred(), defer.Deferred()
420 dlss = deferredutil.DeferredListShouldSucceed([d1,d2])
421 dlss.addCallbacks(good.append, bad.append)
422 d1.addErrback(lambda _ignore: None)
423 d2.addErrback(lambda _ignore: None)
425 d2.errback(RuntimeError())
426 self.failUnlessEqual(good, [])
427 self.failUnlessEqual(len(bad), 1)
429 self.failUnless(isinstance(f, failure.Failure))
430 self.failUnless(f.check(RuntimeError))
432 class HashUtilTests(unittest.TestCase):
433 def test_sha256d(self):
434 h1 = hashutil.tagged_hash_256d("tag1", "value")
435 h2 = hashutil.tagged_hasher_256d("tag1")
438 self.failUnlessEqual(h1, h2)
440 def test_sha256d_truncated(self):
441 h1 = hashutil.tagged_hash_256d("tag1", "value", 16)
442 h2 = hashutil.tagged_hasher_256d("tag1", 16)
445 self.failUnlessEqual(len(h1), 16)
446 self.failUnlessEqual(len(h2), 16)
447 self.failUnlessEqual(h1, h2)
450 h1 = hashutil.content_hash_key_hash(3, 10, 1000, "data")
451 h2 = hashutil.content_hash_key_hasher(3, 10, 1000)
454 self.failUnlessEqual(h1, h2)