]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_util.py
logging: only test log.err when Twisted is new enough to let us ignore the generated...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_util.py
1
2 def foo(): pass # keep the line number constant
3
4 import os
5 from twisted.trial import unittest
6
7 from twisted.python import failure
8 from twisted.python import log as twisted_log
9 from allmydata.util import bencode, idlib, humanreadable, mathutil
10 from allmydata.util import assertutil, fileutil, testutil, log
11
12
13 class IDLib(unittest.TestCase):
14     def test_b2a(self):
15         self.failUnlessEqual(idlib.b2a("\x12\x34"), "ne4y")
16     def test_b2a_or_none(self):
17         self.failUnlessEqual(idlib.b2a_or_none(None), None)
18         self.failUnlessEqual(idlib.b2a_or_none("\x12\x34"), "ne4y")
19     def test_a2b(self):
20         self.failUnlessEqual(idlib.a2b("ne4y"), "\x12\x34")
21         self.failUnlessRaises(AssertionError, idlib.a2b, "b0gus")
22     def test_nodeid_b2a(self):
23         self.failUnlessEqual(idlib.nodeid_b2a("\x00"*20), "a"*32)
24
25 class NoArgumentException(Exception):
26     def __init__(self):
27         pass
28
29 class HumanReadable(unittest.TestCase):
30     def test_repr(self):
31         hr = humanreadable.hr
32         self.failUnlessEqual(hr(foo), "<foo() at test_util.py:2>")
33         self.failUnlessEqual(hr(self.test_repr),
34                              "<bound method HumanReadable.test_repr of <allmydata.test.test_util.HumanReadable testMethod=test_repr>>")
35         self.failUnlessEqual(hr(1L), "1")
36         self.failUnlessEqual(hr(10**40),
37                              "100000000000000000...000000000000000000")
38         self.failUnlessEqual(hr(self), "<allmydata.test.test_util.HumanReadable testMethod=test_repr>")
39         self.failUnlessEqual(hr([1,2]), "[1, 2]")
40         self.failUnlessEqual(hr({1:2}), "{1:2}")
41         try:
42             raise RuntimeError
43         except Exception, e:
44             self.failUnless(
45                 hr(e) == "<RuntimeError: ()>" # python-2.4
46                 or hr(e) == "RuntimeError()") # python-2.5
47         try:
48             raise RuntimeError("oops")
49         except Exception, e:
50             self.failUnless(
51                 hr(e) == "<RuntimeError: 'oops'>" # python-2.4
52                 or hr(e) == "RuntimeError('oops',)") # python-2.5
53         try:
54             raise NoArgumentException
55         except Exception, e:
56             self.failUnless(
57                 hr(e) == "<NoArgumentException>" # python-2.4
58                 or hr(e) == "NoArgumentException()") # python-2.5
59
60
61 class MyList(list):
62     pass
63
64 class Bencode(unittest.TestCase):
65     def test_bencode(self):
66         e = bencode.bencode
67         self.failUnlessEqual(e(4), "i4e")
68         self.failUnlessEqual(e([1,2]), "li1ei2ee")
69         self.failUnlessEqual(e(MyList([1,2])), "li1ei2ee")
70         self.failUnlessEqual(e({1:2}), "di1ei2ee")
71         self.failUnlessEqual(e(u"a"), "u1:a")
72         self.failUnlessEqual(e([True,False]), "lb1b0e")
73         self.failUnlessEqual(e(1.5), "f1.5e")
74         self.failUnlessEqual(e("foo"), "3:foo")
75         d = bencode.bdecode
76         self.failUnlessEqual(d("li1ei2ee"), [1,2])
77         self.failUnlessEqual(d("u1:a"), u"a")
78         self.failUnlessRaises(ValueError, d, "u10:short")
79         self.failUnlessEqual(d("lb1b0e"), [True,False])
80         self.failUnlessRaises(ValueError, d, "b2")
81         self.failUnlessEqual(d("f1.5e"), 1.5)
82         self.failUnlessEqual(d("3:foo"), "foo")
83         self.failUnlessRaises(ValueError, d,
84                               "38:When doing layout, always plan ah")
85         # ooh! fascinating! bdecode requires string keys! I think this ought
86         # to be changed
87         #self.failUnlessEqual(d("di1ei2ee"), {1:2})
88         self.failUnlessEqual(d("d1:ai2eu1:bi3ee"), {"a":2, u"b":3})
89         self.failUnlessRaises(ValueError, d, "di1ei2ee")
90         self.failUnlessRaises(ValueError, d, "d1:ai1e1:ai2ee")
91
92         self.failUnlessRaises(ValueError, d, "i1ei2e")
93
94         # now run all the module's builtin tests
95         bencode.test_decode_raw_string()
96         bencode.test_encode_and_decode_unicode_results_in_unicode_type()
97         bencode.test_encode_and_decode_unicode_at_least_preserves_the_content_even_if_it_flattens_the_type()
98         bencode.test_dict_forbids_non_string_key()
99         bencode.test_dict_forbids_key_repeat()
100         bencode.test_empty_dict()
101         bencode.test_dict_allows_unicode_keys()
102         bencode.test_ValueError_in_decode_unknown()
103         bencode.test_encode_and_decode_none()
104         bencode.test_encode_and_decode_long()
105         bencode.test_encode_and_decode_int()
106         bencode.test_encode_and_decode_float()
107         bencode.test_encode_and_decode_bool()
108         #bencode.test_decode_noncanonical_int()
109         bencode.test_encode_and_decode_dict()
110         bencode.test_encode_and_decode_list()
111         bencode.test_encode_and_decode_tuple()
112         bencode.test_encode_and_decode_empty_dict()
113         bencode.test_encode_and_decode_complex_object()
114         bencode.test_unfinished_list()
115         bencode.test_unfinished_dict()
116         bencode.test_unsupported_type()
117
118 class Math(unittest.TestCase):
119     def test_div_ceil(self):
120         f = mathutil.div_ceil
121         self.failUnlessEqual(f(0, 1), 0)
122         self.failUnlessEqual(f(0, 2), 0)
123         self.failUnlessEqual(f(0, 3), 0)
124         self.failUnlessEqual(f(1, 3), 1)
125         self.failUnlessEqual(f(2, 3), 1)
126         self.failUnlessEqual(f(3, 3), 1)
127         self.failUnlessEqual(f(4, 3), 2)
128         self.failUnlessEqual(f(5, 3), 2)
129         self.failUnlessEqual(f(6, 3), 2)
130         self.failUnlessEqual(f(7, 3), 3)
131
132     def test_next_multiple(self):
133         f = mathutil.next_multiple
134         self.failUnlessEqual(f(5, 1), 5)
135         self.failUnlessEqual(f(5, 2), 6)
136         self.failUnlessEqual(f(5, 3), 6)
137         self.failUnlessEqual(f(5, 4), 8)
138         self.failUnlessEqual(f(5, 5), 5)
139         self.failUnlessEqual(f(5, 6), 6)
140         self.failUnlessEqual(f(32, 1), 32)
141         self.failUnlessEqual(f(32, 2), 32)
142         self.failUnlessEqual(f(32, 3), 33)
143         self.failUnlessEqual(f(32, 4), 32)
144         self.failUnlessEqual(f(32, 5), 35)
145         self.failUnlessEqual(f(32, 6), 36)
146         self.failUnlessEqual(f(32, 7), 35)
147         self.failUnlessEqual(f(32, 8), 32)
148         self.failUnlessEqual(f(32, 9), 36)
149         self.failUnlessEqual(f(32, 10), 40)
150         self.failUnlessEqual(f(32, 11), 33)
151         self.failUnlessEqual(f(32, 12), 36)
152         self.failUnlessEqual(f(32, 13), 39)
153         self.failUnlessEqual(f(32, 14), 42)
154         self.failUnlessEqual(f(32, 15), 45)
155         self.failUnlessEqual(f(32, 16), 32)
156         self.failUnlessEqual(f(32, 17), 34)
157         self.failUnlessEqual(f(32, 18), 36)
158         self.failUnlessEqual(f(32, 589), 589)
159
160     def test_pad_size(self):
161         f = mathutil.pad_size
162         self.failUnlessEqual(f(0, 4), 0)
163         self.failUnlessEqual(f(1, 4), 3)
164         self.failUnlessEqual(f(2, 4), 2)
165         self.failUnlessEqual(f(3, 4), 1)
166         self.failUnlessEqual(f(4, 4), 0)
167         self.failUnlessEqual(f(5, 4), 3)
168
169     def test_is_power_of_k(self):
170         f = mathutil.is_power_of_k
171         for i in range(1, 100):
172             if i in (1, 2, 4, 8, 16, 32, 64):
173                 self.failUnless(f(i, 2), "but %d *is* a power of 2" % i)
174             else:
175                 self.failIf(f(i, 2), "but %d is *not* a power of 2" % i)
176         for i in range(1, 100):
177             if i in (1, 3, 9, 27, 81):
178                 self.failUnless(f(i, 3), "but %d *is* a power of 3" % i)
179             else:
180                 self.failIf(f(i, 3), "but %d is *not* a power of 3" % i)
181
182     def test_next_power_of_k(self):
183         f = mathutil.next_power_of_k
184         self.failUnlessEqual(f(0,2), 1)
185         self.failUnlessEqual(f(1,2), 1)
186         self.failUnlessEqual(f(2,2), 2)
187         self.failUnlessEqual(f(3,2), 4)
188         self.failUnlessEqual(f(4,2), 4)
189         for i in range(5, 8): self.failUnlessEqual(f(i,2), 8, "%d" % i)
190         for i in range(9, 16): self.failUnlessEqual(f(i,2), 16, "%d" % i)
191         for i in range(17, 32): self.failUnlessEqual(f(i,2), 32, "%d" % i)
192         for i in range(33, 64): self.failUnlessEqual(f(i,2), 64, "%d" % i)
193         for i in range(65, 100): self.failUnlessEqual(f(i,2), 128, "%d" % i)
194
195         self.failUnlessEqual(f(0,3), 1)
196         self.failUnlessEqual(f(1,3), 1)
197         self.failUnlessEqual(f(2,3), 3)
198         self.failUnlessEqual(f(3,3), 3)
199         for i in range(4, 9): self.failUnlessEqual(f(i,3), 9, "%d" % i)
200         for i in range(10, 27): self.failUnlessEqual(f(i,3), 27, "%d" % i)
201         for i in range(28, 81): self.failUnlessEqual(f(i,3), 81, "%d" % i)
202         for i in range(82, 200): self.failUnlessEqual(f(i,3), 243, "%d" % i)
203
204     def test_ave(self):
205         f = mathutil.ave
206         self.failUnlessEqual(f([1,2,3]), 2)
207         self.failUnlessEqual(f([0,0,0,4]), 1)
208         self.failUnlessAlmostEqual(f([0.0, 1.0, 1.0]), .666666666666)
209
210
211 class Asserts(unittest.TestCase):
212     def should_assert(self, func, *args, **kwargs):
213         try:
214             func(*args, **kwargs)
215         except AssertionError, e:
216             return str(e)
217         except Exception, e:
218             self.fail("assert failed with non-AssertionError: %s" % e)
219         self.fail("assert was not caught")
220
221     def should_not_assert(self, func, *args, **kwargs):
222         if "re" in kwargs:
223             regexp = kwargs["re"]
224             del kwargs["re"]
225         try:
226             func(*args, **kwargs)
227         except AssertionError, e:
228             self.fail("assertion fired when it should not have: %s" % e)
229         except Exception, e:
230             self.fail("assertion (which shouldn't have failed) failed with non-AssertionError: %s" % e)
231         return # we're happy
232
233
234     def test_assert(self):
235         f = assertutil._assert
236         self.should_assert(f)
237         self.should_assert(f, False)
238         self.should_not_assert(f, True)
239
240         m = self.should_assert(f, False, "message")
241         self.failUnlessEqual(m, "'message' <type 'str'>", m)
242         m = self.should_assert(f, False, "message1", othermsg=12)
243         self.failUnlessEqual("'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
244         m = self.should_assert(f, False, othermsg="message2")
245         self.failUnlessEqual("othermsg: 'message2' <type 'str'>", m)
246
247     def test_precondition(self):
248         f = assertutil.precondition
249         self.should_assert(f)
250         self.should_assert(f, False)
251         self.should_not_assert(f, True)
252
253         m = self.should_assert(f, False, "message")
254         self.failUnlessEqual("precondition: 'message' <type 'str'>", m)
255         m = self.should_assert(f, False, "message1", othermsg=12)
256         self.failUnlessEqual("precondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
257         m = self.should_assert(f, False, othermsg="message2")
258         self.failUnlessEqual("precondition: othermsg: 'message2' <type 'str'>", m)
259
260     def test_postcondition(self):
261         f = assertutil.postcondition
262         self.should_assert(f)
263         self.should_assert(f, False)
264         self.should_not_assert(f, True)
265
266         m = self.should_assert(f, False, "message")
267         self.failUnlessEqual("postcondition: 'message' <type 'str'>", m)
268         m = self.should_assert(f, False, "message1", othermsg=12)
269         self.failUnlessEqual("postcondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
270         m = self.should_assert(f, False, othermsg="message2")
271         self.failUnlessEqual("postcondition: othermsg: 'message2' <type 'str'>", m)
272
273 class FileUtil(unittest.TestCase):
274     def mkdir(self, basedir, path, mode=0777):
275         fn = os.path.join(basedir, path)
276         fileutil.make_dirs(fn, mode)
277
278     def touch(self, basedir, path, mode=None, data="touch\n"):
279         fn = os.path.join(basedir, path)
280         f = open(fn, "w")
281         f.write(data)
282         f.close()
283         if mode is not None:
284             os.chmod(fn, mode)
285
286     def test_rm_dir(self):
287         basedir = "util/FileUtil/test_rm_dir"
288         fileutil.make_dirs(basedir)
289         # create it again to test idempotency
290         fileutil.make_dirs(basedir)
291         d = os.path.join(basedir, "doomed")
292         self.mkdir(d, "a/b")
293         self.touch(d, "a/b/1.txt")
294         self.touch(d, "a/b/2.txt", 0444)
295         self.touch(d, "a/b/3.txt", 0)
296         self.mkdir(d, "a/c")
297         self.touch(d, "a/c/1.txt")
298         self.touch(d, "a/c/2.txt", 0444)
299         self.touch(d, "a/c/3.txt", 0)
300         os.chmod(os.path.join(d, "a/c"), 0444)
301         self.mkdir(d, "a/d")
302         self.touch(d, "a/d/1.txt")
303         self.touch(d, "a/d/2.txt", 0444)
304         self.touch(d, "a/d/3.txt", 0)
305         os.chmod(os.path.join(d, "a/d"), 0)
306
307         fileutil.rm_dir(d)
308         self.failIf(os.path.exists(d))
309         # remove it again to test idempotency
310         fileutil.rm_dir(d)
311
312     def test_remove_if_possible(self):
313         basedir = "util/FileUtil/test_remove_if_possible"
314         fileutil.make_dirs(basedir)
315         self.touch(basedir, "here")
316         fn = os.path.join(basedir, "here")
317         fileutil.remove_if_possible(fn)
318         self.failIf(os.path.exists(fn))
319         fileutil.remove_if_possible(fn) # should be idempotent
320         fileutil.rm_dir(basedir)
321         fileutil.remove_if_possible(fn) # should survive errors
322
323     def test_open_or_create(self):
324         basedir = "util/FileUtil/test_open_or_create"
325         fileutil.make_dirs(basedir)
326         fn = os.path.join(basedir, "here")
327         f = fileutil.open_or_create(fn)
328         f.write("stuff.")
329         f.close()
330         f = fileutil.open_or_create(fn)
331         f.seek(0, 2)
332         f.write("more.")
333         f.close()
334         f = open(fn, "r")
335         data = f.read()
336         f.close()
337         self.failUnlessEqual(data, "stuff.more.")
338
339     def test_NamedTemporaryDirectory(self):
340         basedir = "util/FileUtil/test_NamedTemporaryDirectory"
341         fileutil.make_dirs(basedir)
342         td = fileutil.NamedTemporaryDirectory(dir=basedir)
343         name = td.name
344         self.failUnless(basedir in name)
345         self.failUnless(basedir in repr(td))
346         self.failUnless(os.path.isdir(name))
347         del td
348         # it is conceivable that we need to force gc here, but I'm not sure
349         self.failIf(os.path.isdir(name))
350
351     def test_rename(self):
352         basedir = "util/FileUtil/test_rename"
353         fileutil.make_dirs(basedir)
354         self.touch(basedir, "here")
355         fn = os.path.join(basedir, "here")
356         fn2 = os.path.join(basedir, "there")
357         fileutil.rename(fn, fn2)
358         self.failIf(os.path.exists(fn))
359         self.failUnless(os.path.exists(fn2))
360
361     def test_du(self):
362         basedir = "util/FileUtil/test_du"
363         fileutil.make_dirs(basedir)
364         d = os.path.join(basedir, "space-consuming")
365         self.mkdir(d, "a/b")
366         self.touch(d, "a/b/1.txt", data="a"*10)
367         self.touch(d, "a/b/2.txt", data="b"*11)
368         self.mkdir(d, "a/c")
369         self.touch(d, "a/c/1.txt", data="c"*12)
370         self.touch(d, "a/c/2.txt", data="d"*13)
371
372         used = fileutil.du(basedir)
373         self.failUnlessEqual(10+11+12+13, used)
374
375 class PollMixinTests(unittest.TestCase):
376     def setUp(self):
377         self.pm = testutil.PollMixin()
378
379     def _check(self, d):
380         def fail_unless_arg_is_true(arg):
381             self.failUnless(arg is True, repr(arg))
382         d.addCallback(fail_unless_arg_is_true)
383         return d
384
385     def test_PollMixin_True(self):
386         d = self.pm.poll(check_f=lambda : True,
387                          pollinterval=0.1)
388         return self._check(d)
389
390     def test_PollMixin_False_then_True(self):
391         i = iter([False, True])
392         d = self.pm.poll(check_f=i.next,
393                          pollinterval=0.1)
394         return self._check(d)
395
396 class SampleError(Exception):
397     pass
398
399 class Log(unittest.TestCase):
400     def setUp(self):
401         self.catcher = []
402         twisted_log.addObserver(self.catcher.append)
403     def tearDown(self):
404         twisted_log.removeObserver(self.catcher.append)
405
406     def test_log(self):
407         num = log.msg("this is a message")
408         self.failUnless(isinstance(num, int))
409         log.msg("sub message", parent=num)
410         log.msg("numbered message", number=47)
411
412         logs = self.catcher[:]
413
414         self.failUnlessEqual(logs[0]['message'], ("this is a message",))
415         self.failUnlessEqual(logs[0]['number'], num)
416         self.failUnlessEqual(logs[0]['parent'], None)
417
418         self.failUnlessEqual(logs[1]['message'], ("sub message",))
419         self.failUnlessEqual(logs[1]['number'], num+1)
420         self.failUnlessEqual(logs[1]['parent'], num)
421
422         self.failUnlessEqual(logs[2]['message'], ("numbered message",))
423         self.failUnlessEqual(logs[2]['number'], 47)
424         self.failUnlessEqual(logs[2]['parent'], None)
425
426     def test_err(self):
427         if not hasattr(self, "flushLoggedErrors"):
428             raise unittest.SkipTest("need newer Twisted to test log.err")
429
430         f = failure.Failure(SampleError())
431         num2 = log.err(f)
432         log.err(f, parent=num2)
433         log.err(f, number=48)
434
435         logs = self.catcher[:]
436         self.flushLoggedErrors(SampleError)
437
438         self.failUnlessEqual(logs[0]['message'], ())
439         self.failUnlessEqual(logs[0]['failure'], f)
440         self.failUnlessEqual(logs[0]['number'], num2)
441         self.failUnlessEqual(logs[0]['parent'], None)
442
443         self.failUnlessEqual(logs[1]['message'], ())
444         self.failUnlessEqual(logs[1]['failure'], f)
445         self.failUnlessEqual(logs[1]['number'], num2+1)
446         self.failUnlessEqual(logs[1]['parent'], num2)
447
448         self.failUnlessEqual(logs[2]['message'], ())
449         self.failUnlessEqual(logs[2]['failure'], f)
450         self.failUnlessEqual(logs[2]['number'], 48)
451         self.failUnlessEqual(logs[2]['parent'], None)
452