]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_util.py
switch from rfc 3548 base-32 to z-base-32 except for tubids/nodeids
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_util.py
1
2 def foo(): pass # keep the line number constant
3
4 import os
5 from twisted.trial import unittest
6
7 from allmydata.util import bencode, idlib, humanreadable, mathutil
8 from allmydata.util import assertutil, fileutil
9
10
11 class IDLib(unittest.TestCase):
12     def test_b2a(self):
13         self.failUnlessEqual(idlib.b2a("\x12\x34"), "ne4y")
14     def test_b2a_or_none(self):
15         self.failUnlessEqual(idlib.b2a_or_none(None), None)
16         self.failUnlessEqual(idlib.b2a_or_none("\x12\x34"), "ne4y")
17     def test_a2b(self):
18         self.failUnlessEqual(idlib.a2b("ne4y"), "\x12\x34")
19         self.failUnlessRaises(AssertionError, idlib.a2b, "b0gus")
20
21 class NoArgumentException(Exception):
22     def __init__(self):
23         pass
24
25 class HumanReadable(unittest.TestCase):
26     def test_repr(self):
27         hr = humanreadable.hr
28         self.failUnlessEqual(hr(foo), "<foo() at test_util.py:2>")
29         self.failUnlessEqual(hr(self.test_repr),
30                              "<bound method HumanReadable.test_repr of <allmydata.test.test_util.HumanReadable testMethod=test_repr>>")
31         self.failUnlessEqual(hr(1L), "1")
32         self.failUnlessEqual(hr(10**40),
33                              "100000000000000000...000000000000000000")
34         self.failUnlessEqual(hr(self), "<allmydata.test.test_util.HumanReadable testMethod=test_repr>")
35         self.failUnlessEqual(hr([1,2]), "[1, 2]")
36         self.failUnlessEqual(hr({1:2}), "{1:2}")
37         try:
38             raise RuntimeError
39         except Exception, e:
40             self.failUnless(
41                 hr(e) == "<RuntimeError: ()>" # python-2.4
42                 or hr(e) == "RuntimeError()") # python-2.5
43         try:
44             raise RuntimeError("oops")
45         except Exception, e:
46             self.failUnless(
47                 hr(e) == "<RuntimeError: 'oops'>" # python-2.4
48                 or hr(e) == "RuntimeError('oops',)") # python-2.5
49         try:
50             raise NoArgumentException
51         except Exception, e:
52             self.failUnless(
53                 hr(e) == "<NoArgumentException>" # python-2.4
54                 or hr(e) == "NoArgumentException()") # python-2.5
55
56
57 class MyList(list):
58     pass
59
60 class Bencode(unittest.TestCase):
61     def test_bencode(self):
62         e = bencode.bencode
63         self.failUnlessEqual(e(4), "i4e")
64         self.failUnlessEqual(e([1,2]), "li1ei2ee")
65         self.failUnlessEqual(e(MyList([1,2])), "li1ei2ee")
66         self.failUnlessEqual(e({1:2}), "di1ei2ee")
67         self.failUnlessEqual(e(u"a"), "u1:a")
68         self.failUnlessEqual(e([True,False]), "lb1b0e")
69         self.failUnlessEqual(e(1.5), "f1.5e")
70         self.failUnlessEqual(e("foo"), "3:foo")
71         d = bencode.bdecode
72         self.failUnlessEqual(d("li1ei2ee"), [1,2])
73         self.failUnlessEqual(d("u1:a"), u"a")
74         self.failUnlessRaises(ValueError, d, "u10:short")
75         self.failUnlessEqual(d("lb1b0e"), [True,False])
76         self.failUnlessRaises(ValueError, d, "b2")
77         self.failUnlessEqual(d("f1.5e"), 1.5)
78         self.failUnlessEqual(d("3:foo"), "foo")
79         self.failUnlessRaises(ValueError, d,
80                               "38:When doing layout, always plan ah")
81         # ooh! fascinating! bdecode requires string keys! I think this ought
82         # to be changed
83         #self.failUnlessEqual(d("di1ei2ee"), {1:2})
84         self.failUnlessEqual(d("d1:ai2eu1:bi3ee"), {"a":2, u"b":3})
85         self.failUnlessRaises(ValueError, d, "di1ei2ee")
86         self.failUnlessRaises(ValueError, d, "d1:ai1e1:ai2ee")
87
88         self.failUnlessRaises(ValueError, d, "i1ei2e")
89
90         # now run all the module's builtin tests
91         bencode.test_decode_raw_string()
92         bencode.test_encode_and_decode_unicode_results_in_unicode_type()
93         bencode.test_encode_and_decode_unicode_at_least_preserves_the_content_even_if_it_flattens_the_type()
94         bencode.test_dict_forbids_non_string_key()
95         bencode.test_dict_forbids_key_repeat()
96         bencode.test_empty_dict()
97         bencode.test_dict_allows_unicode_keys()
98         bencode.test_ValueError_in_decode_unknown()
99         bencode.test_encode_and_decode_none()
100         bencode.test_encode_and_decode_long()
101         bencode.test_encode_and_decode_int()
102         bencode.test_encode_and_decode_float()
103         bencode.test_encode_and_decode_bool()
104         #bencode.test_decode_noncanonical_int()
105         bencode.test_encode_and_decode_dict()
106         bencode.test_encode_and_decode_list()
107         bencode.test_encode_and_decode_tuple()
108         bencode.test_encode_and_decode_empty_dict()
109         bencode.test_encode_and_decode_complex_object()
110         bencode.test_unfinished_list()
111         bencode.test_unfinished_dict()
112         bencode.test_unsupported_type()
113
114 class Math(unittest.TestCase):
115     def test_div_ceil(self):
116         f = mathutil.div_ceil
117         self.failUnlessEqual(f(0, 1), 0)
118         self.failUnlessEqual(f(0, 2), 0)
119         self.failUnlessEqual(f(0, 3), 0)
120         self.failUnlessEqual(f(1, 3), 1)
121         self.failUnlessEqual(f(2, 3), 1)
122         self.failUnlessEqual(f(3, 3), 1)
123         self.failUnlessEqual(f(4, 3), 2)
124         self.failUnlessEqual(f(5, 3), 2)
125         self.failUnlessEqual(f(6, 3), 2)
126         self.failUnlessEqual(f(7, 3), 3)
127
128     def test_next_multiple(self):
129         f = mathutil.next_multiple
130         self.failUnlessEqual(f(5, 1), 5)
131         self.failUnlessEqual(f(5, 2), 6)
132         self.failUnlessEqual(f(5, 3), 6)
133         self.failUnlessEqual(f(5, 4), 8)
134         self.failUnlessEqual(f(5, 5), 5)
135         self.failUnlessEqual(f(5, 6), 6)
136         self.failUnlessEqual(f(32, 1), 32)
137         self.failUnlessEqual(f(32, 2), 32)
138         self.failUnlessEqual(f(32, 3), 33)
139         self.failUnlessEqual(f(32, 4), 32)
140         self.failUnlessEqual(f(32, 5), 35)
141         self.failUnlessEqual(f(32, 6), 36)
142         self.failUnlessEqual(f(32, 7), 35)
143         self.failUnlessEqual(f(32, 8), 32)
144         self.failUnlessEqual(f(32, 9), 36)
145         self.failUnlessEqual(f(32, 10), 40)
146         self.failUnlessEqual(f(32, 11), 33)
147         self.failUnlessEqual(f(32, 12), 36)
148         self.failUnlessEqual(f(32, 13), 39)
149         self.failUnlessEqual(f(32, 14), 42)
150         self.failUnlessEqual(f(32, 15), 45)
151         self.failUnlessEqual(f(32, 16), 32)
152         self.failUnlessEqual(f(32, 17), 34)
153         self.failUnlessEqual(f(32, 18), 36)
154         self.failUnlessEqual(f(32, 589), 589)
155
156     def test_pad_size(self):
157         f = mathutil.pad_size
158         self.failUnlessEqual(f(0, 4), 0)
159         self.failUnlessEqual(f(1, 4), 3)
160         self.failUnlessEqual(f(2, 4), 2)
161         self.failUnlessEqual(f(3, 4), 1)
162         self.failUnlessEqual(f(4, 4), 0)
163         self.failUnlessEqual(f(5, 4), 3)
164
165     def test_is_power_of_k(self):
166         f = mathutil.is_power_of_k
167         for i in range(1, 100):
168             if i in (1, 2, 4, 8, 16, 32, 64):
169                 self.failUnless(f(i, 2), "but %d *is* a power of 2" % i)
170             else:
171                 self.failIf(f(i, 2), "but %d is *not* a power of 2" % i)
172         for i in range(1, 100):
173             if i in (1, 3, 9, 27, 81):
174                 self.failUnless(f(i, 3), "but %d *is* a power of 3" % i)
175             else:
176                 self.failIf(f(i, 3), "but %d is *not* a power of 3" % i)
177
178     def test_next_power_of_k(self):
179         f = mathutil.next_power_of_k
180         self.failUnlessEqual(f(0,2), 1)
181         self.failUnlessEqual(f(1,2), 1)
182         self.failUnlessEqual(f(2,2), 2)
183         self.failUnlessEqual(f(3,2), 4)
184         self.failUnlessEqual(f(4,2), 4)
185         for i in range(5, 8): self.failUnlessEqual(f(i,2), 8, "%d" % i)
186         for i in range(9, 16): self.failUnlessEqual(f(i,2), 16, "%d" % i)
187         for i in range(17, 32): self.failUnlessEqual(f(i,2), 32, "%d" % i)
188         for i in range(33, 64): self.failUnlessEqual(f(i,2), 64, "%d" % i)
189         for i in range(65, 100): self.failUnlessEqual(f(i,2), 128, "%d" % i)
190
191         self.failUnlessEqual(f(0,3), 1)
192         self.failUnlessEqual(f(1,3), 1)
193         self.failUnlessEqual(f(2,3), 3)
194         self.failUnlessEqual(f(3,3), 3)
195         for i in range(4, 9): self.failUnlessEqual(f(i,3), 9, "%d" % i)
196         for i in range(10, 27): self.failUnlessEqual(f(i,3), 27, "%d" % i)
197         for i in range(28, 81): self.failUnlessEqual(f(i,3), 81, "%d" % i)
198         for i in range(82, 200): self.failUnlessEqual(f(i,3), 243, "%d" % i)
199
200     def test_ave(self):
201         f = mathutil.ave
202         self.failUnlessEqual(f([1,2,3]), 2)
203         self.failUnlessEqual(f([0,0,0,4]), 1)
204         self.failUnlessAlmostEqual(f([0.0, 1.0, 1.0]), .666666666666)
205
206
207 class Asserts(unittest.TestCase):
208     def should_assert(self, func, *args, **kwargs):
209         try:
210             func(*args, **kwargs)
211         except AssertionError, e:
212             return str(e)
213         except Exception, e:
214             self.fail("assert failed with non-AssertionError: %s" % e)
215         self.fail("assert was not caught")
216
217     def should_not_assert(self, func, *args, **kwargs):
218         if "re" in kwargs:
219             regexp = kwargs["re"]
220             del kwargs["re"]
221         try:
222             func(*args, **kwargs)
223         except AssertionError, e:
224             self.fail("assertion fired when it should not have: %s" % e)
225         except Exception, e:
226             self.fail("assertion (which shouldn't have failed) failed with non-AssertionError: %s" % e)
227         return # we're happy
228
229
230     def test_assert(self):
231         f = assertutil._assert
232         self.should_assert(f)
233         self.should_assert(f, False)
234         self.should_not_assert(f, True)
235
236         m = self.should_assert(f, False, "message")
237         self.failUnlessEqual(m, "'message' <type 'str'>", m)
238         m = self.should_assert(f, False, "message1", othermsg=12)
239         self.failUnlessEqual("'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
240         m = self.should_assert(f, False, othermsg="message2")
241         self.failUnlessEqual("othermsg: 'message2' <type 'str'>", m)
242
243     def test_precondition(self):
244         f = assertutil.precondition
245         self.should_assert(f)
246         self.should_assert(f, False)
247         self.should_not_assert(f, True)
248
249         m = self.should_assert(f, False, "message")
250         self.failUnlessEqual("precondition: 'message' <type 'str'>", m)
251         m = self.should_assert(f, False, "message1", othermsg=12)
252         self.failUnlessEqual("precondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
253         m = self.should_assert(f, False, othermsg="message2")
254         self.failUnlessEqual("precondition: othermsg: 'message2' <type 'str'>", m)
255
256     def test_postcondition(self):
257         f = assertutil.postcondition
258         self.should_assert(f)
259         self.should_assert(f, False)
260         self.should_not_assert(f, True)
261
262         m = self.should_assert(f, False, "message")
263         self.failUnlessEqual("postcondition: 'message' <type 'str'>", m)
264         m = self.should_assert(f, False, "message1", othermsg=12)
265         self.failUnlessEqual("postcondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
266         m = self.should_assert(f, False, othermsg="message2")
267         self.failUnlessEqual("postcondition: othermsg: 'message2' <type 'str'>", m)
268
269 class FileUtil(unittest.TestCase):
270     def mkdir(self, basedir, path, mode=0777):
271         fn = os.path.join(basedir, path)
272         fileutil.make_dirs(fn, mode)
273
274     def touch(self, basedir, path, mode=None, data="touch\n"):
275         fn = os.path.join(basedir, path)
276         f = open(fn, "w")
277         f.write(data)
278         f.close()
279         if mode is not None:
280             os.chmod(fn, mode)
281
282     def test_rm_dir(self):
283         basedir = "util/FileUtil/test_rm_dir"
284         fileutil.make_dirs(basedir)
285         # create it again to test idempotency
286         fileutil.make_dirs(basedir)
287         d = os.path.join(basedir, "doomed")
288         self.mkdir(d, "a/b")
289         self.touch(d, "a/b/1.txt")
290         self.touch(d, "a/b/2.txt", 0444)
291         self.touch(d, "a/b/3.txt", 0)
292         self.mkdir(d, "a/c")
293         self.touch(d, "a/c/1.txt")
294         self.touch(d, "a/c/2.txt", 0444)
295         self.touch(d, "a/c/3.txt", 0)
296         os.chmod(os.path.join(d, "a/c"), 0444)
297         self.mkdir(d, "a/d")
298         self.touch(d, "a/d/1.txt")
299         self.touch(d, "a/d/2.txt", 0444)
300         self.touch(d, "a/d/3.txt", 0)
301         os.chmod(os.path.join(d, "a/d"), 0)
302
303         fileutil.rm_dir(d)
304         self.failIf(os.path.exists(d))
305         # remove it again to test idempotency
306         fileutil.rm_dir(d)
307
308     def test_remove_if_possible(self):
309         basedir = "util/FileUtil/test_remove_if_possible"
310         fileutil.make_dirs(basedir)
311         self.touch(basedir, "here")
312         fn = os.path.join(basedir, "here")
313         fileutil.remove_if_possible(fn)
314         self.failIf(os.path.exists(fn))
315         fileutil.remove_if_possible(fn) # should be idempotent
316         fileutil.rm_dir(basedir)
317         fileutil.remove_if_possible(fn) # should survive errors
318
319     def test_open_or_create(self):
320         basedir = "util/FileUtil/test_open_or_create"
321         fileutil.make_dirs(basedir)
322         fn = os.path.join(basedir, "here")
323         f = fileutil.open_or_create(fn)
324         f.write("stuff.")
325         f.close()
326         f = fileutil.open_or_create(fn)
327         f.seek(0, 2)
328         f.write("more.")
329         f.close()
330         f = open(fn, "r")
331         data = f.read()
332         f.close()
333         self.failUnlessEqual(data, "stuff.more.")
334
335     def test_NamedTemporaryDirectory(self):
336         basedir = "util/FileUtil/test_NamedTemporaryDirectory"
337         fileutil.make_dirs(basedir)
338         td = fileutil.NamedTemporaryDirectory(dir=basedir)
339         name = td.name
340         self.failUnless(basedir in name)
341         self.failUnless(basedir in repr(td))
342         self.failUnless(os.path.isdir(name))
343         del td
344         # it is conceivable that we need to force gc here, but I'm not sure
345         self.failIf(os.path.isdir(name))
346
347     def test_rename(self):
348         basedir = "util/FileUtil/test_rename"
349         fileutil.make_dirs(basedir)
350         self.touch(basedir, "here")
351         fn = os.path.join(basedir, "here")
352         fn2 = os.path.join(basedir, "there")
353         fileutil.rename(fn, fn2)
354         self.failIf(os.path.exists(fn))
355         self.failUnless(os.path.exists(fn2))
356
357     def test_du(self):
358         basedir = "util/FileUtil/test_du"
359         fileutil.make_dirs(basedir)
360         d = os.path.join(basedir, "space-consuming")
361         self.mkdir(d, "a/b")
362         self.touch(d, "a/b/1.txt", data="a"*10)
363         self.touch(d, "a/b/2.txt", data="b"*11)
364         self.mkdir(d, "a/c")
365         self.touch(d, "a/c/1.txt", data="c"*12)
366         self.touch(d, "a/c/2.txt", data="d"*13)
367
368         used = fileutil.du(basedir)
369         self.failUnlessEqual(10+11+12+13, used)
370