]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_util.py
merge patch which adds base32 to test_util with patch which removes bencode from...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_util.py
1
2 def foo(): pass # keep the line number constant
3
4 import os
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import failure
8
9 from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil
10 from allmydata.util import assertutil, fileutil, testutil, deferredutil
11
12
13 class Base32(unittest.TestCase):
14     def test_b2a(self):
15         self.failUnlessEqual(base32.b2a("\x12\x34"), "ci2a")
16     def test_b2a_or_none(self):
17         self.failUnlessEqual(base32.b2a_or_none(None), None)
18         self.failUnlessEqual(base32.b2a_or_none("\x12\x34"), "ci2a")
19     def test_a2b(self):
20         self.failUnlessEqual(base32.a2b("ci2a"), "\x12\x34")
21         self.failUnlessRaises(AssertionError, base32.a2b, "b0gus")
22
23 class IDLib(unittest.TestCase):
24     def test_nodeid_b2a(self):
25         self.failUnlessEqual(idlib.nodeid_b2a("\x00"*20), "a"*32)
26
27 class NoArgumentException(Exception):
28     def __init__(self):
29         pass
30
31 class HumanReadable(unittest.TestCase):
32     def test_repr(self):
33         hr = humanreadable.hr
34         self.failUnlessEqual(hr(foo), "<foo() at test_util.py:2>")
35         self.failUnlessEqual(hr(self.test_repr),
36                              "<bound method HumanReadable.test_repr of <allmydata.test.test_util.HumanReadable testMethod=test_repr>>")
37         self.failUnlessEqual(hr(1L), "1")
38         self.failUnlessEqual(hr(10**40),
39                              "100000000000000000...000000000000000000")
40         self.failUnlessEqual(hr(self), "<allmydata.test.test_util.HumanReadable testMethod=test_repr>")
41         self.failUnlessEqual(hr([1,2]), "[1, 2]")
42         self.failUnlessEqual(hr({1:2}), "{1:2}")
43         try:
44             raise RuntimeError
45         except Exception, e:
46             self.failUnless(
47                 hr(e) == "<RuntimeError: ()>" # python-2.4
48                 or hr(e) == "RuntimeError()") # python-2.5
49         try:
50             raise RuntimeError("oops")
51         except Exception, e:
52             self.failUnless(
53                 hr(e) == "<RuntimeError: 'oops'>" # python-2.4
54                 or hr(e) == "RuntimeError('oops',)") # python-2.5
55         try:
56             raise NoArgumentException
57         except Exception, e:
58             self.failUnless(
59                 hr(e) == "<NoArgumentException>" # python-2.4
60                 or hr(e) == "NoArgumentException()") # python-2.5
61
62
63 class MyList(list):
64     pass
65
66 class Math(unittest.TestCase):
67     def test_div_ceil(self):
68         f = mathutil.div_ceil
69         self.failUnlessEqual(f(0, 1), 0)
70         self.failUnlessEqual(f(0, 2), 0)
71         self.failUnlessEqual(f(0, 3), 0)
72         self.failUnlessEqual(f(1, 3), 1)
73         self.failUnlessEqual(f(2, 3), 1)
74         self.failUnlessEqual(f(3, 3), 1)
75         self.failUnlessEqual(f(4, 3), 2)
76         self.failUnlessEqual(f(5, 3), 2)
77         self.failUnlessEqual(f(6, 3), 2)
78         self.failUnlessEqual(f(7, 3), 3)
79
80     def test_next_multiple(self):
81         f = mathutil.next_multiple
82         self.failUnlessEqual(f(5, 1), 5)
83         self.failUnlessEqual(f(5, 2), 6)
84         self.failUnlessEqual(f(5, 3), 6)
85         self.failUnlessEqual(f(5, 4), 8)
86         self.failUnlessEqual(f(5, 5), 5)
87         self.failUnlessEqual(f(5, 6), 6)
88         self.failUnlessEqual(f(32, 1), 32)
89         self.failUnlessEqual(f(32, 2), 32)
90         self.failUnlessEqual(f(32, 3), 33)
91         self.failUnlessEqual(f(32, 4), 32)
92         self.failUnlessEqual(f(32, 5), 35)
93         self.failUnlessEqual(f(32, 6), 36)
94         self.failUnlessEqual(f(32, 7), 35)
95         self.failUnlessEqual(f(32, 8), 32)
96         self.failUnlessEqual(f(32, 9), 36)
97         self.failUnlessEqual(f(32, 10), 40)
98         self.failUnlessEqual(f(32, 11), 33)
99         self.failUnlessEqual(f(32, 12), 36)
100         self.failUnlessEqual(f(32, 13), 39)
101         self.failUnlessEqual(f(32, 14), 42)
102         self.failUnlessEqual(f(32, 15), 45)
103         self.failUnlessEqual(f(32, 16), 32)
104         self.failUnlessEqual(f(32, 17), 34)
105         self.failUnlessEqual(f(32, 18), 36)
106         self.failUnlessEqual(f(32, 589), 589)
107
108     def test_pad_size(self):
109         f = mathutil.pad_size
110         self.failUnlessEqual(f(0, 4), 0)
111         self.failUnlessEqual(f(1, 4), 3)
112         self.failUnlessEqual(f(2, 4), 2)
113         self.failUnlessEqual(f(3, 4), 1)
114         self.failUnlessEqual(f(4, 4), 0)
115         self.failUnlessEqual(f(5, 4), 3)
116
117     def test_is_power_of_k(self):
118         f = mathutil.is_power_of_k
119         for i in range(1, 100):
120             if i in (1, 2, 4, 8, 16, 32, 64):
121                 self.failUnless(f(i, 2), "but %d *is* a power of 2" % i)
122             else:
123                 self.failIf(f(i, 2), "but %d is *not* a power of 2" % i)
124         for i in range(1, 100):
125             if i in (1, 3, 9, 27, 81):
126                 self.failUnless(f(i, 3), "but %d *is* a power of 3" % i)
127             else:
128                 self.failIf(f(i, 3), "but %d is *not* a power of 3" % i)
129
130     def test_next_power_of_k(self):
131         f = mathutil.next_power_of_k
132         self.failUnlessEqual(f(0,2), 1)
133         self.failUnlessEqual(f(1,2), 1)
134         self.failUnlessEqual(f(2,2), 2)
135         self.failUnlessEqual(f(3,2), 4)
136         self.failUnlessEqual(f(4,2), 4)
137         for i in range(5, 8): self.failUnlessEqual(f(i,2), 8, "%d" % i)
138         for i in range(9, 16): self.failUnlessEqual(f(i,2), 16, "%d" % i)
139         for i in range(17, 32): self.failUnlessEqual(f(i,2), 32, "%d" % i)
140         for i in range(33, 64): self.failUnlessEqual(f(i,2), 64, "%d" % i)
141         for i in range(65, 100): self.failUnlessEqual(f(i,2), 128, "%d" % i)
142
143         self.failUnlessEqual(f(0,3), 1)
144         self.failUnlessEqual(f(1,3), 1)
145         self.failUnlessEqual(f(2,3), 3)
146         self.failUnlessEqual(f(3,3), 3)
147         for i in range(4, 9): self.failUnlessEqual(f(i,3), 9, "%d" % i)
148         for i in range(10, 27): self.failUnlessEqual(f(i,3), 27, "%d" % i)
149         for i in range(28, 81): self.failUnlessEqual(f(i,3), 81, "%d" % i)
150         for i in range(82, 200): self.failUnlessEqual(f(i,3), 243, "%d" % i)
151
152     def test_ave(self):
153         f = mathutil.ave
154         self.failUnlessEqual(f([1,2,3]), 2)
155         self.failUnlessEqual(f([0,0,0,4]), 1)
156         self.failUnlessAlmostEqual(f([0.0, 1.0, 1.0]), .666666666666)
157
158
159 class Asserts(unittest.TestCase):
160     def should_assert(self, func, *args, **kwargs):
161         try:
162             func(*args, **kwargs)
163         except AssertionError, e:
164             return str(e)
165         except Exception, e:
166             self.fail("assert failed with non-AssertionError: %s" % e)
167         self.fail("assert was not caught")
168
169     def should_not_assert(self, func, *args, **kwargs):
170         if "re" in kwargs:
171             regexp = kwargs["re"]
172             del kwargs["re"]
173         try:
174             func(*args, **kwargs)
175         except AssertionError, e:
176             self.fail("assertion fired when it should not have: %s" % e)
177         except Exception, e:
178             self.fail("assertion (which shouldn't have failed) failed with non-AssertionError: %s" % e)
179         return # we're happy
180
181
182     def test_assert(self):
183         f = assertutil._assert
184         self.should_assert(f)
185         self.should_assert(f, False)
186         self.should_not_assert(f, True)
187
188         m = self.should_assert(f, False, "message")
189         self.failUnlessEqual(m, "'message' <type 'str'>", m)
190         m = self.should_assert(f, False, "message1", othermsg=12)
191         self.failUnlessEqual("'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
192         m = self.should_assert(f, False, othermsg="message2")
193         self.failUnlessEqual("othermsg: 'message2' <type 'str'>", m)
194
195     def test_precondition(self):
196         f = assertutil.precondition
197         self.should_assert(f)
198         self.should_assert(f, False)
199         self.should_not_assert(f, True)
200
201         m = self.should_assert(f, False, "message")
202         self.failUnlessEqual("precondition: 'message' <type 'str'>", m)
203         m = self.should_assert(f, False, "message1", othermsg=12)
204         self.failUnlessEqual("precondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
205         m = self.should_assert(f, False, othermsg="message2")
206         self.failUnlessEqual("precondition: othermsg: 'message2' <type 'str'>", m)
207
208     def test_postcondition(self):
209         f = assertutil.postcondition
210         self.should_assert(f)
211         self.should_assert(f, False)
212         self.should_not_assert(f, True)
213
214         m = self.should_assert(f, False, "message")
215         self.failUnlessEqual("postcondition: 'message' <type 'str'>", m)
216         m = self.should_assert(f, False, "message1", othermsg=12)
217         self.failUnlessEqual("postcondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
218         m = self.should_assert(f, False, othermsg="message2")
219         self.failUnlessEqual("postcondition: othermsg: 'message2' <type 'str'>", m)
220
221 class FileUtil(unittest.TestCase):
222     def mkdir(self, basedir, path, mode=0777):
223         fn = os.path.join(basedir, path)
224         fileutil.make_dirs(fn, mode)
225
226     def touch(self, basedir, path, mode=None, data="touch\n"):
227         fn = os.path.join(basedir, path)
228         f = open(fn, "w")
229         f.write(data)
230         f.close()
231         if mode is not None:
232             os.chmod(fn, mode)
233
234     def test_rm_dir(self):
235         basedir = "util/FileUtil/test_rm_dir"
236         fileutil.make_dirs(basedir)
237         # create it again to test idempotency
238         fileutil.make_dirs(basedir)
239         d = os.path.join(basedir, "doomed")
240         self.mkdir(d, "a/b")
241         self.touch(d, "a/b/1.txt")
242         self.touch(d, "a/b/2.txt", 0444)
243         self.touch(d, "a/b/3.txt", 0)
244         self.mkdir(d, "a/c")
245         self.touch(d, "a/c/1.txt")
246         self.touch(d, "a/c/2.txt", 0444)
247         self.touch(d, "a/c/3.txt", 0)
248         os.chmod(os.path.join(d, "a/c"), 0444)
249         self.mkdir(d, "a/d")
250         self.touch(d, "a/d/1.txt")
251         self.touch(d, "a/d/2.txt", 0444)
252         self.touch(d, "a/d/3.txt", 0)
253         os.chmod(os.path.join(d, "a/d"), 0)
254
255         fileutil.rm_dir(d)
256         self.failIf(os.path.exists(d))
257         # remove it again to test idempotency
258         fileutil.rm_dir(d)
259
260     def test_remove_if_possible(self):
261         basedir = "util/FileUtil/test_remove_if_possible"
262         fileutil.make_dirs(basedir)
263         self.touch(basedir, "here")
264         fn = os.path.join(basedir, "here")
265         fileutil.remove_if_possible(fn)
266         self.failIf(os.path.exists(fn))
267         fileutil.remove_if_possible(fn) # should be idempotent
268         fileutil.rm_dir(basedir)
269         fileutil.remove_if_possible(fn) # should survive errors
270
271     def test_open_or_create(self):
272         basedir = "util/FileUtil/test_open_or_create"
273         fileutil.make_dirs(basedir)
274         fn = os.path.join(basedir, "here")
275         f = fileutil.open_or_create(fn)
276         f.write("stuff.")
277         f.close()
278         f = fileutil.open_or_create(fn)
279         f.seek(0, 2)
280         f.write("more.")
281         f.close()
282         f = open(fn, "r")
283         data = f.read()
284         f.close()
285         self.failUnlessEqual(data, "stuff.more.")
286
287     def test_NamedTemporaryDirectory(self):
288         basedir = "util/FileUtil/test_NamedTemporaryDirectory"
289         fileutil.make_dirs(basedir)
290         td = fileutil.NamedTemporaryDirectory(dir=basedir)
291         name = td.name
292         self.failUnless(basedir in name)
293         self.failUnless(basedir in repr(td))
294         self.failUnless(os.path.isdir(name))
295         del td
296         # it is conceivable that we need to force gc here, but I'm not sure
297         self.failIf(os.path.isdir(name))
298
299     def test_rename(self):
300         basedir = "util/FileUtil/test_rename"
301         fileutil.make_dirs(basedir)
302         self.touch(basedir, "here")
303         fn = os.path.join(basedir, "here")
304         fn2 = os.path.join(basedir, "there")
305         fileutil.rename(fn, fn2)
306         self.failIf(os.path.exists(fn))
307         self.failUnless(os.path.exists(fn2))
308
309     def test_du(self):
310         basedir = "util/FileUtil/test_du"
311         fileutil.make_dirs(basedir)
312         d = os.path.join(basedir, "space-consuming")
313         self.mkdir(d, "a/b")
314         self.touch(d, "a/b/1.txt", data="a"*10)
315         self.touch(d, "a/b/2.txt", data="b"*11)
316         self.mkdir(d, "a/c")
317         self.touch(d, "a/c/1.txt", data="c"*12)
318         self.touch(d, "a/c/2.txt", data="d"*13)
319
320         used = fileutil.du(basedir)
321         self.failUnlessEqual(10+11+12+13, used)
322
323 class PollMixinTests(unittest.TestCase):
324     def setUp(self):
325         self.pm = testutil.PollMixin()
326
327     def test_PollMixin_True(self):
328         d = self.pm.poll(check_f=lambda : True,
329                          pollinterval=0.1)
330         return d
331
332     def test_PollMixin_False_then_True(self):
333         i = iter([False, True])
334         d = self.pm.poll(check_f=i.next,
335                          pollinterval=0.1)
336         return d
337
338     def test_timeout(self):
339         d = self.pm.poll(check_f=lambda: False,
340                          pollinterval=0.01,
341                          timeout=1)
342         def _suc(res):
343             self.fail("poll should have failed, not returned %s" % (res,))
344         def _err(f):
345             f.trap(testutil.TimeoutError)
346             return None # success
347         d.addCallbacks(_suc, _err)
348         return d
349
350 class DeferredUtilTests(unittest.TestCase):
351     def test_success(self):
352         d1, d2 = defer.Deferred(), defer.Deferred()
353         good = []
354         bad = []
355         dlss = deferredutil.DeferredListShouldSucceed([d1,d2])
356         dlss.addCallbacks(good.append, bad.append)
357         d1.callback(1)
358         d2.callback(2)
359         self.failUnlessEqual(good, [[1,2]])
360         self.failUnlessEqual(bad, [])
361
362     def test_failure(self):
363         d1, d2 = defer.Deferred(), defer.Deferred()
364         good = []
365         bad = []
366         dlss = deferredutil.DeferredListShouldSucceed([d1,d2])
367         dlss.addCallbacks(good.append, bad.append)
368         d1.addErrback(lambda _ignore: None)
369         d2.addErrback(lambda _ignore: None)
370         d1.callback(1)
371         d2.errback(RuntimeError())
372         self.failUnlessEqual(good, [])
373         self.failUnlessEqual(len(bad), 1)
374         f = bad[0]
375         self.failUnless(isinstance(f, failure.Failure))
376         self.failUnless(f.check(RuntimeError))
377
378 class HashUtilTests(unittest.TestCase):
379     def test_sha256d(self):
380         h1 = hashutil.tagged_hash_256d("tag1", "value")
381         h2 = hashutil.tagged_hasher_256d("tag1")
382         h2.update("value")
383         h2 = h2.digest()
384         self.failUnlessEqual(h1, h2)
385
386     def test_sha256d_truncated(self):
387         h1 = hashutil.tagged_hash_256d("tag1", "value", 16)
388         h2 = hashutil.tagged_hasher_256d("tag1", 16)
389         h2.update("value")
390         h2 = h2.digest()
391         self.failUnlessEqual(len(h1), 16)
392         self.failUnlessEqual(len(h2), 16)
393         self.failUnlessEqual(h1, h2)
394
395     def test_chk(self):
396         h1 = hashutil.content_hash_key_hash(3, 10, 1000, "data")
397         h2 = hashutil.content_hash_key_hasher(3, 10, 1000)
398         h2.update("data")
399         h2 = h2.digest()
400         self.failUnlessEqual(h1, h2)