]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_util.py
stabilize on 20-byte nodeids everywhere, printed with foolscap's base32
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_util.py
1
2 def foo(): pass # keep the line number constant
3
4 import os
5 from twisted.trial import unittest
6
7 from allmydata.util import bencode, idlib, humanreadable, mathutil
8 from allmydata.util import assertutil, fileutil, testutil
9
10
11 class IDLib(unittest.TestCase):
12     def test_b2a(self):
13         self.failUnlessEqual(idlib.b2a("\x12\x34"), "ne4y")
14     def test_b2a_or_none(self):
15         self.failUnlessEqual(idlib.b2a_or_none(None), None)
16         self.failUnlessEqual(idlib.b2a_or_none("\x12\x34"), "ne4y")
17     def test_a2b(self):
18         self.failUnlessEqual(idlib.a2b("ne4y"), "\x12\x34")
19         self.failUnlessRaises(AssertionError, idlib.a2b, "b0gus")
20     def test_nodeid_b2a(self):
21         self.failUnlessEqual(idlib.nodeid_b2a("\x00"*20), "a"*32)
22
23 class NoArgumentException(Exception):
24     def __init__(self):
25         pass
26
27 class HumanReadable(unittest.TestCase):
28     def test_repr(self):
29         hr = humanreadable.hr
30         self.failUnlessEqual(hr(foo), "<foo() at test_util.py:2>")
31         self.failUnlessEqual(hr(self.test_repr),
32                              "<bound method HumanReadable.test_repr of <allmydata.test.test_util.HumanReadable testMethod=test_repr>>")
33         self.failUnlessEqual(hr(1L), "1")
34         self.failUnlessEqual(hr(10**40),
35                              "100000000000000000...000000000000000000")
36         self.failUnlessEqual(hr(self), "<allmydata.test.test_util.HumanReadable testMethod=test_repr>")
37         self.failUnlessEqual(hr([1,2]), "[1, 2]")
38         self.failUnlessEqual(hr({1:2}), "{1:2}")
39         try:
40             raise RuntimeError
41         except Exception, e:
42             self.failUnless(
43                 hr(e) == "<RuntimeError: ()>" # python-2.4
44                 or hr(e) == "RuntimeError()") # python-2.5
45         try:
46             raise RuntimeError("oops")
47         except Exception, e:
48             self.failUnless(
49                 hr(e) == "<RuntimeError: 'oops'>" # python-2.4
50                 or hr(e) == "RuntimeError('oops',)") # python-2.5
51         try:
52             raise NoArgumentException
53         except Exception, e:
54             self.failUnless(
55                 hr(e) == "<NoArgumentException>" # python-2.4
56                 or hr(e) == "NoArgumentException()") # python-2.5
57
58
59 class MyList(list):
60     pass
61
62 class Bencode(unittest.TestCase):
63     def test_bencode(self):
64         e = bencode.bencode
65         self.failUnlessEqual(e(4), "i4e")
66         self.failUnlessEqual(e([1,2]), "li1ei2ee")
67         self.failUnlessEqual(e(MyList([1,2])), "li1ei2ee")
68         self.failUnlessEqual(e({1:2}), "di1ei2ee")
69         self.failUnlessEqual(e(u"a"), "u1:a")
70         self.failUnlessEqual(e([True,False]), "lb1b0e")
71         self.failUnlessEqual(e(1.5), "f1.5e")
72         self.failUnlessEqual(e("foo"), "3:foo")
73         d = bencode.bdecode
74         self.failUnlessEqual(d("li1ei2ee"), [1,2])
75         self.failUnlessEqual(d("u1:a"), u"a")
76         self.failUnlessRaises(ValueError, d, "u10:short")
77         self.failUnlessEqual(d("lb1b0e"), [True,False])
78         self.failUnlessRaises(ValueError, d, "b2")
79         self.failUnlessEqual(d("f1.5e"), 1.5)
80         self.failUnlessEqual(d("3:foo"), "foo")
81         self.failUnlessRaises(ValueError, d,
82                               "38:When doing layout, always plan ah")
83         # ooh! fascinating! bdecode requires string keys! I think this ought
84         # to be changed
85         #self.failUnlessEqual(d("di1ei2ee"), {1:2})
86         self.failUnlessEqual(d("d1:ai2eu1:bi3ee"), {"a":2, u"b":3})
87         self.failUnlessRaises(ValueError, d, "di1ei2ee")
88         self.failUnlessRaises(ValueError, d, "d1:ai1e1:ai2ee")
89
90         self.failUnlessRaises(ValueError, d, "i1ei2e")
91
92         # now run all the module's builtin tests
93         bencode.test_decode_raw_string()
94         bencode.test_encode_and_decode_unicode_results_in_unicode_type()
95         bencode.test_encode_and_decode_unicode_at_least_preserves_the_content_even_if_it_flattens_the_type()
96         bencode.test_dict_forbids_non_string_key()
97         bencode.test_dict_forbids_key_repeat()
98         bencode.test_empty_dict()
99         bencode.test_dict_allows_unicode_keys()
100         bencode.test_ValueError_in_decode_unknown()
101         bencode.test_encode_and_decode_none()
102         bencode.test_encode_and_decode_long()
103         bencode.test_encode_and_decode_int()
104         bencode.test_encode_and_decode_float()
105         bencode.test_encode_and_decode_bool()
106         #bencode.test_decode_noncanonical_int()
107         bencode.test_encode_and_decode_dict()
108         bencode.test_encode_and_decode_list()
109         bencode.test_encode_and_decode_tuple()
110         bencode.test_encode_and_decode_empty_dict()
111         bencode.test_encode_and_decode_complex_object()
112         bencode.test_unfinished_list()
113         bencode.test_unfinished_dict()
114         bencode.test_unsupported_type()
115
116 class Math(unittest.TestCase):
117     def test_div_ceil(self):
118         f = mathutil.div_ceil
119         self.failUnlessEqual(f(0, 1), 0)
120         self.failUnlessEqual(f(0, 2), 0)
121         self.failUnlessEqual(f(0, 3), 0)
122         self.failUnlessEqual(f(1, 3), 1)
123         self.failUnlessEqual(f(2, 3), 1)
124         self.failUnlessEqual(f(3, 3), 1)
125         self.failUnlessEqual(f(4, 3), 2)
126         self.failUnlessEqual(f(5, 3), 2)
127         self.failUnlessEqual(f(6, 3), 2)
128         self.failUnlessEqual(f(7, 3), 3)
129
130     def test_next_multiple(self):
131         f = mathutil.next_multiple
132         self.failUnlessEqual(f(5, 1), 5)
133         self.failUnlessEqual(f(5, 2), 6)
134         self.failUnlessEqual(f(5, 3), 6)
135         self.failUnlessEqual(f(5, 4), 8)
136         self.failUnlessEqual(f(5, 5), 5)
137         self.failUnlessEqual(f(5, 6), 6)
138         self.failUnlessEqual(f(32, 1), 32)
139         self.failUnlessEqual(f(32, 2), 32)
140         self.failUnlessEqual(f(32, 3), 33)
141         self.failUnlessEqual(f(32, 4), 32)
142         self.failUnlessEqual(f(32, 5), 35)
143         self.failUnlessEqual(f(32, 6), 36)
144         self.failUnlessEqual(f(32, 7), 35)
145         self.failUnlessEqual(f(32, 8), 32)
146         self.failUnlessEqual(f(32, 9), 36)
147         self.failUnlessEqual(f(32, 10), 40)
148         self.failUnlessEqual(f(32, 11), 33)
149         self.failUnlessEqual(f(32, 12), 36)
150         self.failUnlessEqual(f(32, 13), 39)
151         self.failUnlessEqual(f(32, 14), 42)
152         self.failUnlessEqual(f(32, 15), 45)
153         self.failUnlessEqual(f(32, 16), 32)
154         self.failUnlessEqual(f(32, 17), 34)
155         self.failUnlessEqual(f(32, 18), 36)
156         self.failUnlessEqual(f(32, 589), 589)
157
158     def test_pad_size(self):
159         f = mathutil.pad_size
160         self.failUnlessEqual(f(0, 4), 0)
161         self.failUnlessEqual(f(1, 4), 3)
162         self.failUnlessEqual(f(2, 4), 2)
163         self.failUnlessEqual(f(3, 4), 1)
164         self.failUnlessEqual(f(4, 4), 0)
165         self.failUnlessEqual(f(5, 4), 3)
166
167     def test_is_power_of_k(self):
168         f = mathutil.is_power_of_k
169         for i in range(1, 100):
170             if i in (1, 2, 4, 8, 16, 32, 64):
171                 self.failUnless(f(i, 2), "but %d *is* a power of 2" % i)
172             else:
173                 self.failIf(f(i, 2), "but %d is *not* a power of 2" % i)
174         for i in range(1, 100):
175             if i in (1, 3, 9, 27, 81):
176                 self.failUnless(f(i, 3), "but %d *is* a power of 3" % i)
177             else:
178                 self.failIf(f(i, 3), "but %d is *not* a power of 3" % i)
179
180     def test_next_power_of_k(self):
181         f = mathutil.next_power_of_k
182         self.failUnlessEqual(f(0,2), 1)
183         self.failUnlessEqual(f(1,2), 1)
184         self.failUnlessEqual(f(2,2), 2)
185         self.failUnlessEqual(f(3,2), 4)
186         self.failUnlessEqual(f(4,2), 4)
187         for i in range(5, 8): self.failUnlessEqual(f(i,2), 8, "%d" % i)
188         for i in range(9, 16): self.failUnlessEqual(f(i,2), 16, "%d" % i)
189         for i in range(17, 32): self.failUnlessEqual(f(i,2), 32, "%d" % i)
190         for i in range(33, 64): self.failUnlessEqual(f(i,2), 64, "%d" % i)
191         for i in range(65, 100): self.failUnlessEqual(f(i,2), 128, "%d" % i)
192
193         self.failUnlessEqual(f(0,3), 1)
194         self.failUnlessEqual(f(1,3), 1)
195         self.failUnlessEqual(f(2,3), 3)
196         self.failUnlessEqual(f(3,3), 3)
197         for i in range(4, 9): self.failUnlessEqual(f(i,3), 9, "%d" % i)
198         for i in range(10, 27): self.failUnlessEqual(f(i,3), 27, "%d" % i)
199         for i in range(28, 81): self.failUnlessEqual(f(i,3), 81, "%d" % i)
200         for i in range(82, 200): self.failUnlessEqual(f(i,3), 243, "%d" % i)
201
202     def test_ave(self):
203         f = mathutil.ave
204         self.failUnlessEqual(f([1,2,3]), 2)
205         self.failUnlessEqual(f([0,0,0,4]), 1)
206         self.failUnlessAlmostEqual(f([0.0, 1.0, 1.0]), .666666666666)
207
208
209 class Asserts(unittest.TestCase):
210     def should_assert(self, func, *args, **kwargs):
211         try:
212             func(*args, **kwargs)
213         except AssertionError, e:
214             return str(e)
215         except Exception, e:
216             self.fail("assert failed with non-AssertionError: %s" % e)
217         self.fail("assert was not caught")
218
219     def should_not_assert(self, func, *args, **kwargs):
220         if "re" in kwargs:
221             regexp = kwargs["re"]
222             del kwargs["re"]
223         try:
224             func(*args, **kwargs)
225         except AssertionError, e:
226             self.fail("assertion fired when it should not have: %s" % e)
227         except Exception, e:
228             self.fail("assertion (which shouldn't have failed) failed with non-AssertionError: %s" % e)
229         return # we're happy
230
231
232     def test_assert(self):
233         f = assertutil._assert
234         self.should_assert(f)
235         self.should_assert(f, False)
236         self.should_not_assert(f, True)
237
238         m = self.should_assert(f, False, "message")
239         self.failUnlessEqual(m, "'message' <type 'str'>", m)
240         m = self.should_assert(f, False, "message1", othermsg=12)
241         self.failUnlessEqual("'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
242         m = self.should_assert(f, False, othermsg="message2")
243         self.failUnlessEqual("othermsg: 'message2' <type 'str'>", m)
244
245     def test_precondition(self):
246         f = assertutil.precondition
247         self.should_assert(f)
248         self.should_assert(f, False)
249         self.should_not_assert(f, True)
250
251         m = self.should_assert(f, False, "message")
252         self.failUnlessEqual("precondition: 'message' <type 'str'>", m)
253         m = self.should_assert(f, False, "message1", othermsg=12)
254         self.failUnlessEqual("precondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
255         m = self.should_assert(f, False, othermsg="message2")
256         self.failUnlessEqual("precondition: othermsg: 'message2' <type 'str'>", m)
257
258     def test_postcondition(self):
259         f = assertutil.postcondition
260         self.should_assert(f)
261         self.should_assert(f, False)
262         self.should_not_assert(f, True)
263
264         m = self.should_assert(f, False, "message")
265         self.failUnlessEqual("postcondition: 'message' <type 'str'>", m)
266         m = self.should_assert(f, False, "message1", othermsg=12)
267         self.failUnlessEqual("postcondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
268         m = self.should_assert(f, False, othermsg="message2")
269         self.failUnlessEqual("postcondition: othermsg: 'message2' <type 'str'>", m)
270
271 class FileUtil(unittest.TestCase):
272     def mkdir(self, basedir, path, mode=0777):
273         fn = os.path.join(basedir, path)
274         fileutil.make_dirs(fn, mode)
275
276     def touch(self, basedir, path, mode=None, data="touch\n"):
277         fn = os.path.join(basedir, path)
278         f = open(fn, "w")
279         f.write(data)
280         f.close()
281         if mode is not None:
282             os.chmod(fn, mode)
283
284     def test_rm_dir(self):
285         basedir = "util/FileUtil/test_rm_dir"
286         fileutil.make_dirs(basedir)
287         # create it again to test idempotency
288         fileutil.make_dirs(basedir)
289         d = os.path.join(basedir, "doomed")
290         self.mkdir(d, "a/b")
291         self.touch(d, "a/b/1.txt")
292         self.touch(d, "a/b/2.txt", 0444)
293         self.touch(d, "a/b/3.txt", 0)
294         self.mkdir(d, "a/c")
295         self.touch(d, "a/c/1.txt")
296         self.touch(d, "a/c/2.txt", 0444)
297         self.touch(d, "a/c/3.txt", 0)
298         os.chmod(os.path.join(d, "a/c"), 0444)
299         self.mkdir(d, "a/d")
300         self.touch(d, "a/d/1.txt")
301         self.touch(d, "a/d/2.txt", 0444)
302         self.touch(d, "a/d/3.txt", 0)
303         os.chmod(os.path.join(d, "a/d"), 0)
304
305         fileutil.rm_dir(d)
306         self.failIf(os.path.exists(d))
307         # remove it again to test idempotency
308         fileutil.rm_dir(d)
309
310     def test_remove_if_possible(self):
311         basedir = "util/FileUtil/test_remove_if_possible"
312         fileutil.make_dirs(basedir)
313         self.touch(basedir, "here")
314         fn = os.path.join(basedir, "here")
315         fileutil.remove_if_possible(fn)
316         self.failIf(os.path.exists(fn))
317         fileutil.remove_if_possible(fn) # should be idempotent
318         fileutil.rm_dir(basedir)
319         fileutil.remove_if_possible(fn) # should survive errors
320
321     def test_open_or_create(self):
322         basedir = "util/FileUtil/test_open_or_create"
323         fileutil.make_dirs(basedir)
324         fn = os.path.join(basedir, "here")
325         f = fileutil.open_or_create(fn)
326         f.write("stuff.")
327         f.close()
328         f = fileutil.open_or_create(fn)
329         f.seek(0, 2)
330         f.write("more.")
331         f.close()
332         f = open(fn, "r")
333         data = f.read()
334         f.close()
335         self.failUnlessEqual(data, "stuff.more.")
336
337     def test_NamedTemporaryDirectory(self):
338         basedir = "util/FileUtil/test_NamedTemporaryDirectory"
339         fileutil.make_dirs(basedir)
340         td = fileutil.NamedTemporaryDirectory(dir=basedir)
341         name = td.name
342         self.failUnless(basedir in name)
343         self.failUnless(basedir in repr(td))
344         self.failUnless(os.path.isdir(name))
345         del td
346         # it is conceivable that we need to force gc here, but I'm not sure
347         self.failIf(os.path.isdir(name))
348
349     def test_rename(self):
350         basedir = "util/FileUtil/test_rename"
351         fileutil.make_dirs(basedir)
352         self.touch(basedir, "here")
353         fn = os.path.join(basedir, "here")
354         fn2 = os.path.join(basedir, "there")
355         fileutil.rename(fn, fn2)
356         self.failIf(os.path.exists(fn))
357         self.failUnless(os.path.exists(fn2))
358
359     def test_du(self):
360         basedir = "util/FileUtil/test_du"
361         fileutil.make_dirs(basedir)
362         d = os.path.join(basedir, "space-consuming")
363         self.mkdir(d, "a/b")
364         self.touch(d, "a/b/1.txt", data="a"*10)
365         self.touch(d, "a/b/2.txt", data="b"*11)
366         self.mkdir(d, "a/c")
367         self.touch(d, "a/c/1.txt", data="c"*12)
368         self.touch(d, "a/c/2.txt", data="d"*13)
369
370         used = fileutil.du(basedir)
371         self.failUnlessEqual(10+11+12+13, used)
372
373 class PollMixinTests(unittest.TestCase):
374     def setUp(self):
375         self.pm = testutil.PollMixin()
376
377     def _check(self, d):
378         def fail_unless_arg_is_true(arg):
379             self.failUnless(arg is True, repr(arg))
380         d.addCallback(fail_unless_arg_is_true)
381         return d
382
383     def test_PollMixin_True(self):
384         d = self.pm.poll(check_f=lambda : True,
385                          pollinterval=0.1)
386         return self._check(d)
387
388     def test_PollMixin_False_then_True(self):
389         i = iter([False, True])
390         d = self.pm.poll(check_f=i.next,
391                          pollinterval=0.1)
392         return self._check(d)