2 def foo(): pass # keep the line number constant
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import failure
9 from allmydata.util import bencode, idlib, humanreadable, mathutil, hashutil
10 from allmydata.util import assertutil, fileutil, testutil, deferredutil
13 class Base32(unittest.TestCase):
15 self.failUnlessEqual(base32.b2a("\x12\x34"), "ci2a")
16 def test_b2a_or_none(self):
17 self.failUnlessEqual(base32.b2a_or_none(None), None)
18 self.failUnlessEqual(base32.b2a_or_none("\x12\x34"), "ci2a")
20 self.failUnlessEqual(base32.a2b("ci2a"), "\x12\x34")
21 self.failUnlessRaises(AssertionError, base32.a2b, "b0gus")
23 class IDLib(unittest.TestCase):
24 def test_nodeid_b2a(self):
25 self.failUnlessEqual(idlib.nodeid_b2a("\x00"*20), "a"*32)
27 class NoArgumentException(Exception):
31 class HumanReadable(unittest.TestCase):
34 self.failUnlessEqual(hr(foo), "<foo() at test_util.py:2>")
35 self.failUnlessEqual(hr(self.test_repr),
36 "<bound method HumanReadable.test_repr of <allmydata.test.test_util.HumanReadable testMethod=test_repr>>")
37 self.failUnlessEqual(hr(1L), "1")
38 self.failUnlessEqual(hr(10**40),
39 "100000000000000000...000000000000000000")
40 self.failUnlessEqual(hr(self), "<allmydata.test.test_util.HumanReadable testMethod=test_repr>")
41 self.failUnlessEqual(hr([1,2]), "[1, 2]")
42 self.failUnlessEqual(hr({1:2}), "{1:2}")
47 hr(e) == "<RuntimeError: ()>" # python-2.4
48 or hr(e) == "RuntimeError()") # python-2.5
50 raise RuntimeError("oops")
53 hr(e) == "<RuntimeError: 'oops'>" # python-2.4
54 or hr(e) == "RuntimeError('oops',)") # python-2.5
56 raise NoArgumentException
59 hr(e) == "<NoArgumentException>" # python-2.4
60 or hr(e) == "NoArgumentException()") # python-2.5
66 class Math(unittest.TestCase):
67 def test_div_ceil(self):
69 self.failUnlessEqual(f(0, 1), 0)
70 self.failUnlessEqual(f(0, 2), 0)
71 self.failUnlessEqual(f(0, 3), 0)
72 self.failUnlessEqual(f(1, 3), 1)
73 self.failUnlessEqual(f(2, 3), 1)
74 self.failUnlessEqual(f(3, 3), 1)
75 self.failUnlessEqual(f(4, 3), 2)
76 self.failUnlessEqual(f(5, 3), 2)
77 self.failUnlessEqual(f(6, 3), 2)
78 self.failUnlessEqual(f(7, 3), 3)
80 def test_next_multiple(self):
81 f = mathutil.next_multiple
82 self.failUnlessEqual(f(5, 1), 5)
83 self.failUnlessEqual(f(5, 2), 6)
84 self.failUnlessEqual(f(5, 3), 6)
85 self.failUnlessEqual(f(5, 4), 8)
86 self.failUnlessEqual(f(5, 5), 5)
87 self.failUnlessEqual(f(5, 6), 6)
88 self.failUnlessEqual(f(32, 1), 32)
89 self.failUnlessEqual(f(32, 2), 32)
90 self.failUnlessEqual(f(32, 3), 33)
91 self.failUnlessEqual(f(32, 4), 32)
92 self.failUnlessEqual(f(32, 5), 35)
93 self.failUnlessEqual(f(32, 6), 36)
94 self.failUnlessEqual(f(32, 7), 35)
95 self.failUnlessEqual(f(32, 8), 32)
96 self.failUnlessEqual(f(32, 9), 36)
97 self.failUnlessEqual(f(32, 10), 40)
98 self.failUnlessEqual(f(32, 11), 33)
99 self.failUnlessEqual(f(32, 12), 36)
100 self.failUnlessEqual(f(32, 13), 39)
101 self.failUnlessEqual(f(32, 14), 42)
102 self.failUnlessEqual(f(32, 15), 45)
103 self.failUnlessEqual(f(32, 16), 32)
104 self.failUnlessEqual(f(32, 17), 34)
105 self.failUnlessEqual(f(32, 18), 36)
106 self.failUnlessEqual(f(32, 589), 589)
108 def test_pad_size(self):
109 f = mathutil.pad_size
110 self.failUnlessEqual(f(0, 4), 0)
111 self.failUnlessEqual(f(1, 4), 3)
112 self.failUnlessEqual(f(2, 4), 2)
113 self.failUnlessEqual(f(3, 4), 1)
114 self.failUnlessEqual(f(4, 4), 0)
115 self.failUnlessEqual(f(5, 4), 3)
117 def test_is_power_of_k(self):
118 f = mathutil.is_power_of_k
119 for i in range(1, 100):
120 if i in (1, 2, 4, 8, 16, 32, 64):
121 self.failUnless(f(i, 2), "but %d *is* a power of 2" % i)
123 self.failIf(f(i, 2), "but %d is *not* a power of 2" % i)
124 for i in range(1, 100):
125 if i in (1, 3, 9, 27, 81):
126 self.failUnless(f(i, 3), "but %d *is* a power of 3" % i)
128 self.failIf(f(i, 3), "but %d is *not* a power of 3" % i)
130 def test_next_power_of_k(self):
131 f = mathutil.next_power_of_k
132 self.failUnlessEqual(f(0,2), 1)
133 self.failUnlessEqual(f(1,2), 1)
134 self.failUnlessEqual(f(2,2), 2)
135 self.failUnlessEqual(f(3,2), 4)
136 self.failUnlessEqual(f(4,2), 4)
137 for i in range(5, 8): self.failUnlessEqual(f(i,2), 8, "%d" % i)
138 for i in range(9, 16): self.failUnlessEqual(f(i,2), 16, "%d" % i)
139 for i in range(17, 32): self.failUnlessEqual(f(i,2), 32, "%d" % i)
140 for i in range(33, 64): self.failUnlessEqual(f(i,2), 64, "%d" % i)
141 for i in range(65, 100): self.failUnlessEqual(f(i,2), 128, "%d" % i)
143 self.failUnlessEqual(f(0,3), 1)
144 self.failUnlessEqual(f(1,3), 1)
145 self.failUnlessEqual(f(2,3), 3)
146 self.failUnlessEqual(f(3,3), 3)
147 for i in range(4, 9): self.failUnlessEqual(f(i,3), 9, "%d" % i)
148 for i in range(10, 27): self.failUnlessEqual(f(i,3), 27, "%d" % i)
149 for i in range(28, 81): self.failUnlessEqual(f(i,3), 81, "%d" % i)
150 for i in range(82, 200): self.failUnlessEqual(f(i,3), 243, "%d" % i)
154 self.failUnlessEqual(f([1,2,3]), 2)
155 self.failUnlessEqual(f([0,0,0,4]), 1)
156 self.failUnlessAlmostEqual(f([0.0, 1.0, 1.0]), .666666666666)
159 class Asserts(unittest.TestCase):
160 def should_assert(self, func, *args, **kwargs):
162 func(*args, **kwargs)
163 except AssertionError, e:
166 self.fail("assert failed with non-AssertionError: %s" % e)
167 self.fail("assert was not caught")
169 def should_not_assert(self, func, *args, **kwargs):
171 regexp = kwargs["re"]
174 func(*args, **kwargs)
175 except AssertionError, e:
176 self.fail("assertion fired when it should not have: %s" % e)
178 self.fail("assertion (which shouldn't have failed) failed with non-AssertionError: %s" % e)
182 def test_assert(self):
183 f = assertutil._assert
184 self.should_assert(f)
185 self.should_assert(f, False)
186 self.should_not_assert(f, True)
188 m = self.should_assert(f, False, "message")
189 self.failUnlessEqual(m, "'message' <type 'str'>", m)
190 m = self.should_assert(f, False, "message1", othermsg=12)
191 self.failUnlessEqual("'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
192 m = self.should_assert(f, False, othermsg="message2")
193 self.failUnlessEqual("othermsg: 'message2' <type 'str'>", m)
195 def test_precondition(self):
196 f = assertutil.precondition
197 self.should_assert(f)
198 self.should_assert(f, False)
199 self.should_not_assert(f, True)
201 m = self.should_assert(f, False, "message")
202 self.failUnlessEqual("precondition: 'message' <type 'str'>", m)
203 m = self.should_assert(f, False, "message1", othermsg=12)
204 self.failUnlessEqual("precondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
205 m = self.should_assert(f, False, othermsg="message2")
206 self.failUnlessEqual("precondition: othermsg: 'message2' <type 'str'>", m)
208 def test_postcondition(self):
209 f = assertutil.postcondition
210 self.should_assert(f)
211 self.should_assert(f, False)
212 self.should_not_assert(f, True)
214 m = self.should_assert(f, False, "message")
215 self.failUnlessEqual("postcondition: 'message' <type 'str'>", m)
216 m = self.should_assert(f, False, "message1", othermsg=12)
217 self.failUnlessEqual("postcondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
218 m = self.should_assert(f, False, othermsg="message2")
219 self.failUnlessEqual("postcondition: othermsg: 'message2' <type 'str'>", m)
221 class FileUtil(unittest.TestCase):
222 def mkdir(self, basedir, path, mode=0777):
223 fn = os.path.join(basedir, path)
224 fileutil.make_dirs(fn, mode)
226 def touch(self, basedir, path, mode=None, data="touch\n"):
227 fn = os.path.join(basedir, path)
234 def test_rm_dir(self):
235 basedir = "util/FileUtil/test_rm_dir"
236 fileutil.make_dirs(basedir)
237 # create it again to test idempotency
238 fileutil.make_dirs(basedir)
239 d = os.path.join(basedir, "doomed")
241 self.touch(d, "a/b/1.txt")
242 self.touch(d, "a/b/2.txt", 0444)
243 self.touch(d, "a/b/3.txt", 0)
245 self.touch(d, "a/c/1.txt")
246 self.touch(d, "a/c/2.txt", 0444)
247 self.touch(d, "a/c/3.txt", 0)
248 os.chmod(os.path.join(d, "a/c"), 0444)
250 self.touch(d, "a/d/1.txt")
251 self.touch(d, "a/d/2.txt", 0444)
252 self.touch(d, "a/d/3.txt", 0)
253 os.chmod(os.path.join(d, "a/d"), 0)
256 self.failIf(os.path.exists(d))
257 # remove it again to test idempotency
260 def test_remove_if_possible(self):
261 basedir = "util/FileUtil/test_remove_if_possible"
262 fileutil.make_dirs(basedir)
263 self.touch(basedir, "here")
264 fn = os.path.join(basedir, "here")
265 fileutil.remove_if_possible(fn)
266 self.failIf(os.path.exists(fn))
267 fileutil.remove_if_possible(fn) # should be idempotent
268 fileutil.rm_dir(basedir)
269 fileutil.remove_if_possible(fn) # should survive errors
271 def test_open_or_create(self):
272 basedir = "util/FileUtil/test_open_or_create"
273 fileutil.make_dirs(basedir)
274 fn = os.path.join(basedir, "here")
275 f = fileutil.open_or_create(fn)
278 f = fileutil.open_or_create(fn)
285 self.failUnlessEqual(data, "stuff.more.")
287 def test_NamedTemporaryDirectory(self):
288 basedir = "util/FileUtil/test_NamedTemporaryDirectory"
289 fileutil.make_dirs(basedir)
290 td = fileutil.NamedTemporaryDirectory(dir=basedir)
292 self.failUnless(basedir in name)
293 self.failUnless(basedir in repr(td))
294 self.failUnless(os.path.isdir(name))
296 # it is conceivable that we need to force gc here, but I'm not sure
297 self.failIf(os.path.isdir(name))
299 def test_rename(self):
300 basedir = "util/FileUtil/test_rename"
301 fileutil.make_dirs(basedir)
302 self.touch(basedir, "here")
303 fn = os.path.join(basedir, "here")
304 fn2 = os.path.join(basedir, "there")
305 fileutil.rename(fn, fn2)
306 self.failIf(os.path.exists(fn))
307 self.failUnless(os.path.exists(fn2))
310 basedir = "util/FileUtil/test_du"
311 fileutil.make_dirs(basedir)
312 d = os.path.join(basedir, "space-consuming")
314 self.touch(d, "a/b/1.txt", data="a"*10)
315 self.touch(d, "a/b/2.txt", data="b"*11)
317 self.touch(d, "a/c/1.txt", data="c"*12)
318 self.touch(d, "a/c/2.txt", data="d"*13)
320 used = fileutil.du(basedir)
321 self.failUnlessEqual(10+11+12+13, used)
323 class PollMixinTests(unittest.TestCase):
325 self.pm = testutil.PollMixin()
327 def test_PollMixin_True(self):
328 d = self.pm.poll(check_f=lambda : True,
332 def test_PollMixin_False_then_True(self):
333 i = iter([False, True])
334 d = self.pm.poll(check_f=i.next,
338 def test_timeout(self):
339 d = self.pm.poll(check_f=lambda: False,
343 self.fail("poll should have failed, not returned %s" % (res,))
345 f.trap(testutil.TimeoutError)
346 return None # success
347 d.addCallbacks(_suc, _err)
350 class DeferredUtilTests(unittest.TestCase):
351 def test_success(self):
352 d1, d2 = defer.Deferred(), defer.Deferred()
355 dlss = deferredutil.DeferredListShouldSucceed([d1,d2])
356 dlss.addCallbacks(good.append, bad.append)
359 self.failUnlessEqual(good, [[1,2]])
360 self.failUnlessEqual(bad, [])
362 def test_failure(self):
363 d1, d2 = defer.Deferred(), defer.Deferred()
366 dlss = deferredutil.DeferredListShouldSucceed([d1,d2])
367 dlss.addCallbacks(good.append, bad.append)
368 d1.addErrback(lambda _ignore: None)
369 d2.addErrback(lambda _ignore: None)
371 d2.errback(RuntimeError())
372 self.failUnlessEqual(good, [])
373 self.failUnlessEqual(len(bad), 1)
375 self.failUnless(isinstance(f, failure.Failure))
376 self.failUnless(f.check(RuntimeError))
378 class HashUtilTests(unittest.TestCase):
379 def test_sha256d(self):
380 h1 = hashutil.tagged_hash_256d("tag1", "value")
381 h2 = hashutil.tagged_hasher_256d("tag1")
384 self.failUnlessEqual(h1, h2)
386 def test_sha256d_truncated(self):
387 h1 = hashutil.tagged_hash_256d("tag1", "value", 16)
388 h2 = hashutil.tagged_hasher_256d("tag1", 16)
391 self.failUnlessEqual(len(h1), 16)
392 self.failUnlessEqual(len(h2), 16)
393 self.failUnlessEqual(h1, h2)
396 h1 = hashutil.content_hash_key_hash(3, 10, 1000, "data")
397 h2 = hashutil.content_hash_key_hasher(3, 10, 1000)
400 self.failUnlessEqual(h1, h2)