]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_util.py
test_util: improve test coverage of allmydata.util.fileutil
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_util.py
1
2 def foo(): pass # keep the line number constant
3
4 import os
5 from twisted.trial import unittest
6
7 from allmydata.util import bencode, idlib, humanreadable, mathutil
8 from allmydata.util import assertutil, fileutil
9
10
11 class IDLib(unittest.TestCase):
12     def test_b2a(self):
13         self.failUnlessEqual(idlib.b2a("\x12\x34"), "ci2a====")
14     def test_b2a_or_none(self):
15         self.failUnlessEqual(idlib.b2a_or_none(None), None)
16         self.failUnlessEqual(idlib.b2a_or_none("\x12\x34"), "ci2a====")
17     def test_a2b(self):
18         self.failUnlessEqual(idlib.a2b("ci2a===="), "\x12\x34")
19         self.failUnlessRaises(TypeError, idlib.a2b, "bogus")
20     def test_peerid(self):
21         # these are 160-bit numbers
22         peerid = "\x80" + "\x00" * 19
23         short = idlib.peerid_to_short_string(peerid)
24         self.failUnlessEqual(short, "qaaa")
25
26 class NoArgumentException(Exception):
27     def __init__(self):
28         pass
29
30 class HumanReadable(unittest.TestCase):
31     def test_repr(self):
32         hr = humanreadable.hr
33         self.failUnlessEqual(hr(foo), "<foo() at test_util.py:2>")
34         self.failUnlessEqual(hr(self.test_repr),
35                              "<bound method HumanReadable.test_repr of <allmydata.test.test_util.HumanReadable testMethod=test_repr>>")
36         self.failUnlessEqual(hr(1L), "1")
37         self.failUnlessEqual(hr(10**40),
38                              "100000000000000000...000000000000000000")
39         self.failUnlessEqual(hr(self), "<allmydata.test.test_util.HumanReadable testMethod=test_repr>")
40         self.failUnlessEqual(hr([1,2]), "[1, 2]")
41         self.failUnlessEqual(hr({1:2}), "{1:2}")
42         try:
43             raise RuntimeError
44         except Exception, e:
45             self.failUnless(
46                 hr(e) == "<RuntimeError: ()>" # python-2.4
47                 or hr(e) == "RuntimeError()") # python-2.5
48         try:
49             raise RuntimeError("oops")
50         except Exception, e:
51             self.failUnless(
52                 hr(e) == "<RuntimeError: 'oops'>" # python-2.4
53                 or hr(e) == "RuntimeError('oops',)") # python-2.5
54         try:
55             raise NoArgumentException
56         except Exception, e:
57             self.failUnless(
58                 hr(e) == "<NoArgumentException>" # python-2.4
59                 or hr(e) == "NoArgumentException()") # python-2.5
60
61
62 class MyList(list):
63     pass
64
65 class Bencode(unittest.TestCase):
66     def test_bencode(self):
67         e = bencode.bencode
68         self.failUnlessEqual(e(4), "i4e")
69         self.failUnlessEqual(e([1,2]), "li1ei2ee")
70         self.failUnlessEqual(e(MyList([1,2])), "li1ei2ee")
71         self.failUnlessEqual(e({1:2}), "di1ei2ee")
72         self.failUnlessEqual(e(u"a"), "u1:a")
73         self.failUnlessEqual(e([True,False]), "lb1b0e")
74         self.failUnlessEqual(e(1.5), "f1.5e")
75         self.failUnlessEqual(e("foo"), "3:foo")
76         d = bencode.bdecode
77         self.failUnlessEqual(d("li1ei2ee"), [1,2])
78         self.failUnlessEqual(d("u1:a"), u"a")
79         self.failUnlessRaises(ValueError, d, "u10:short")
80         self.failUnlessEqual(d("lb1b0e"), [True,False])
81         self.failUnlessRaises(ValueError, d, "b2")
82         self.failUnlessEqual(d("f1.5e"), 1.5)
83         self.failUnlessEqual(d("3:foo"), "foo")
84         self.failUnlessRaises(ValueError, d,
85                               "38:When doing layout, always plan ah")
86         # ooh! fascinating! bdecode requires string keys! I think this ought
87         # to be changed
88         #self.failUnlessEqual(d("di1ei2ee"), {1:2})
89         self.failUnlessEqual(d("d1:ai2eu1:bi3ee"), {"a":2, u"b":3})
90         self.failUnlessRaises(ValueError, d, "di1ei2ee")
91         self.failUnlessRaises(ValueError, d, "d1:ai1e1:ai2ee")
92
93         self.failUnlessRaises(ValueError, d, "i1ei2e")
94
95         # now run all the module's builtin tests
96         bencode.test_decode_raw_string()
97         bencode.test_encode_and_decode_unicode_results_in_unicode_type()
98         bencode.test_encode_and_decode_unicode_at_least_preserves_the_content_even_if_it_flattens_the_type()
99         bencode.test_dict_forbids_non_string_key()
100         bencode.test_dict_forbids_key_repeat()
101         bencode.test_empty_dict()
102         bencode.test_dict_allows_unicode_keys()
103         bencode.test_ValueError_in_decode_unknown()
104         bencode.test_encode_and_decode_none()
105         bencode.test_encode_and_decode_long()
106         bencode.test_encode_and_decode_int()
107         bencode.test_encode_and_decode_float()
108         bencode.test_encode_and_decode_bool()
109         #bencode.test_decode_noncanonical_int()
110         bencode.test_encode_and_decode_dict()
111         bencode.test_encode_and_decode_list()
112         bencode.test_encode_and_decode_tuple()
113         bencode.test_encode_and_decode_empty_dict()
114         bencode.test_encode_and_decode_complex_object()
115         bencode.test_unfinished_list()
116         bencode.test_unfinished_dict()
117         bencode.test_unsupported_type()
118
119 class Math(unittest.TestCase):
120     def test_div_ceil(self):
121         f = mathutil.div_ceil
122         self.failUnlessEqual(f(0, 1), 0)
123         self.failUnlessEqual(f(0, 2), 0)
124         self.failUnlessEqual(f(0, 3), 0)
125         self.failUnlessEqual(f(1, 3), 1)
126         self.failUnlessEqual(f(2, 3), 1)
127         self.failUnlessEqual(f(3, 3), 1)
128         self.failUnlessEqual(f(4, 3), 2)
129         self.failUnlessEqual(f(5, 3), 2)
130         self.failUnlessEqual(f(6, 3), 2)
131         self.failUnlessEqual(f(7, 3), 3)
132
133     def test_next_multiple(self):
134         f = mathutil.next_multiple
135         self.failUnlessEqual(f(5, 1), 5)
136         self.failUnlessEqual(f(5, 2), 6)
137         self.failUnlessEqual(f(5, 3), 6)
138         self.failUnlessEqual(f(5, 4), 8)
139         self.failUnlessEqual(f(5, 5), 5)
140         self.failUnlessEqual(f(5, 6), 6)
141         self.failUnlessEqual(f(32, 1), 32)
142         self.failUnlessEqual(f(32, 2), 32)
143         self.failUnlessEqual(f(32, 3), 33)
144         self.failUnlessEqual(f(32, 4), 32)
145         self.failUnlessEqual(f(32, 5), 35)
146         self.failUnlessEqual(f(32, 6), 36)
147         self.failUnlessEqual(f(32, 7), 35)
148         self.failUnlessEqual(f(32, 8), 32)
149         self.failUnlessEqual(f(32, 9), 36)
150         self.failUnlessEqual(f(32, 10), 40)
151         self.failUnlessEqual(f(32, 11), 33)
152         self.failUnlessEqual(f(32, 12), 36)
153         self.failUnlessEqual(f(32, 13), 39)
154         self.failUnlessEqual(f(32, 14), 42)
155         self.failUnlessEqual(f(32, 15), 45)
156         self.failUnlessEqual(f(32, 16), 32)
157         self.failUnlessEqual(f(32, 17), 34)
158         self.failUnlessEqual(f(32, 18), 36)
159         self.failUnlessEqual(f(32, 589), 589)
160
161     def test_pad_size(self):
162         f = mathutil.pad_size
163         self.failUnlessEqual(f(0, 4), 0)
164         self.failUnlessEqual(f(1, 4), 3)
165         self.failUnlessEqual(f(2, 4), 2)
166         self.failUnlessEqual(f(3, 4), 1)
167         self.failUnlessEqual(f(4, 4), 0)
168         self.failUnlessEqual(f(5, 4), 3)
169
170     def test_is_power_of_k(self):
171         f = mathutil.is_power_of_k
172         for i in range(1, 100):
173             if i in (1, 2, 4, 8, 16, 32, 64):
174                 self.failUnless(f(i, 2), "but %d *is* a power of 2" % i)
175             else:
176                 self.failIf(f(i, 2), "but %d is *not* a power of 2" % i)
177         for i in range(1, 100):
178             if i in (1, 3, 9, 27, 81):
179                 self.failUnless(f(i, 3), "but %d *is* a power of 3" % i)
180             else:
181                 self.failIf(f(i, 3), "but %d is *not* a power of 3" % i)
182
183     def test_next_power_of_k(self):
184         f = mathutil.next_power_of_k
185         self.failUnlessEqual(f(0,2), 1)
186         self.failUnlessEqual(f(1,2), 1)
187         self.failUnlessEqual(f(2,2), 2)
188         self.failUnlessEqual(f(3,2), 4)
189         self.failUnlessEqual(f(4,2), 4)
190         for i in range(5, 8): self.failUnlessEqual(f(i,2), 8, "%d" % i)
191         for i in range(9, 16): self.failUnlessEqual(f(i,2), 16, "%d" % i)
192         for i in range(17, 32): self.failUnlessEqual(f(i,2), 32, "%d" % i)
193         for i in range(33, 64): self.failUnlessEqual(f(i,2), 64, "%d" % i)
194         for i in range(65, 100): self.failUnlessEqual(f(i,2), 128, "%d" % i)
195
196         self.failUnlessEqual(f(0,3), 1)
197         self.failUnlessEqual(f(1,3), 1)
198         self.failUnlessEqual(f(2,3), 3)
199         self.failUnlessEqual(f(3,3), 3)
200         for i in range(4, 9): self.failUnlessEqual(f(i,3), 9, "%d" % i)
201         for i in range(10, 27): self.failUnlessEqual(f(i,3), 27, "%d" % i)
202         for i in range(28, 81): self.failUnlessEqual(f(i,3), 81, "%d" % i)
203         for i in range(82, 200): self.failUnlessEqual(f(i,3), 243, "%d" % i)
204
205     def test_ave(self):
206         f = mathutil.ave
207         self.failUnlessEqual(f([1,2,3]), 2)
208         self.failUnlessEqual(f([0,0,0,4]), 1)
209         self.failUnlessAlmostEqual(f([0.0, 1.0, 1.0]), .666666666666)
210
211
212 class Asserts(unittest.TestCase):
213     def should_assert(self, func, *args, **kwargs):
214         try:
215             func(*args, **kwargs)
216         except AssertionError, e:
217             return str(e)
218         except Exception, e:
219             self.fail("assert failed with non-AssertionError: %s" % e)
220         self.fail("assert was not caught")
221
222     def should_not_assert(self, func, *args, **kwargs):
223         if "re" in kwargs:
224             regexp = kwargs["re"]
225             del kwargs["re"]
226         try:
227             func(*args, **kwargs)
228         except AssertionError, e:
229             self.fail("assertion fired when it should not have: %s" % e)
230         except Exception, e:
231             self.fail("assertion (which shouldn't have failed) failed with non-AssertionError: %s" % e)
232         return # we're happy
233
234
235     def test_assert(self):
236         f = assertutil._assert
237         self.should_assert(f)
238         self.should_assert(f, False)
239         self.should_not_assert(f, True)
240
241         m = self.should_assert(f, False, "message")
242         self.failUnlessEqual(m, "'message' <type 'str'>", m)
243         m = self.should_assert(f, False, "message1", othermsg=12)
244         self.failUnlessEqual("'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
245         m = self.should_assert(f, False, othermsg="message2")
246         self.failUnlessEqual("othermsg: 'message2' <type 'str'>", m)
247
248     def test_precondition(self):
249         f = assertutil.precondition
250         self.should_assert(f)
251         self.should_assert(f, False)
252         self.should_not_assert(f, True)
253
254         m = self.should_assert(f, False, "message")
255         self.failUnlessEqual("precondition: 'message' <type 'str'>", m)
256         m = self.should_assert(f, False, "message1", othermsg=12)
257         self.failUnlessEqual("precondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
258         m = self.should_assert(f, False, othermsg="message2")
259         self.failUnlessEqual("precondition: othermsg: 'message2' <type 'str'>", m)
260
261     def test_postcondition(self):
262         f = assertutil.postcondition
263         self.should_assert(f)
264         self.should_assert(f, False)
265         self.should_not_assert(f, True)
266
267         m = self.should_assert(f, False, "message")
268         self.failUnlessEqual("postcondition: 'message' <type 'str'>", m)
269         m = self.should_assert(f, False, "message1", othermsg=12)
270         self.failUnlessEqual("postcondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
271         m = self.should_assert(f, False, othermsg="message2")
272         self.failUnlessEqual("postcondition: othermsg: 'message2' <type 'str'>", m)
273
274 class FileUtil(unittest.TestCase):
275     def mkdir(self, basedir, path, mode=0777):
276         fn = os.path.join(basedir, path)
277         fileutil.make_dirs(fn, mode)
278
279     def touch(self, basedir, path, mode=None):
280         fn = os.path.join(basedir, path)
281         f = open(fn, "w")
282         f.write("touch\n")
283         f.close()
284         if mode is not None:
285             os.chmod(fn, mode)
286
287     def test_rm_dir(self):
288         basedir = "util/FileUtil/test_rm_dir"
289         fileutil.make_dirs(basedir)
290         # create it again to test idempotency
291         fileutil.make_dirs(basedir)
292         d = os.path.join(basedir, "doomed")
293         self.mkdir(d, "a/b")
294         self.touch(d, "a/b/1.txt")
295         self.touch(d, "a/b/2.txt", 0444)
296         self.touch(d, "a/b/3.txt", 0)
297         self.mkdir(d, "a/c")
298         self.touch(d, "a/c/1.txt")
299         self.touch(d, "a/c/2.txt", 0444)
300         self.touch(d, "a/c/3.txt", 0)
301         os.chmod(os.path.join(d, "a/c"), 0444)
302         self.mkdir(d, "a/d")
303         self.touch(d, "a/d/1.txt")
304         self.touch(d, "a/d/2.txt", 0444)
305         self.touch(d, "a/d/3.txt", 0)
306         os.chmod(os.path.join(d, "a/d"), 0)
307
308         fileutil.rm_dir(d)
309         self.failIf(os.path.exists(d))
310         # remove it again to test idempotency
311         fileutil.rm_dir(d)
312
313     def test_remove_if_possible(self):
314         basedir = "util/FileUtil/test_remove_if_possible"
315         fileutil.make_dirs(basedir)
316         self.touch(basedir, "here")
317         fn = os.path.join(basedir, "here")
318         fileutil.remove_if_possible(fn)
319         self.failIf(os.path.exists(fn))
320         fileutil.remove_if_possible(fn) # should be idempotent
321         fileutil.rm_dir(basedir)
322         fileutil.remove_if_possible(fn) # should survive errors
323
324     def test_open_or_create(self):
325         basedir = "util/FileUtil/test_open_or_create"
326         fileutil.make_dirs(basedir)
327         fn = os.path.join(basedir, "here")
328         f = fileutil.open_or_create(fn)
329         f.write("stuff.")
330         f.close()
331         f = fileutil.open_or_create(fn)
332         f.seek(0, 2)
333         f.write("more.")
334         f.close()
335         f = open(fn, "r")
336         data = f.read()
337         f.close()
338         self.failUnlessEqual(data, "stuff.more.")
339
340     def test_NamedTemporaryDirectory(self):
341         basedir = "util/FileUtil/test_NamedTemporaryDirectory"
342         fileutil.make_dirs(basedir)
343         td = fileutil.NamedTemporaryDirectory(dir=basedir)
344         name = td.name
345         self.failUnless(basedir in name)
346         self.failUnless(basedir in repr(td))
347         self.failUnless(os.path.isdir(name))
348         del td
349         # it is conceivable that we need to force gc here, but I'm not sure
350         self.failIf(os.path.isdir(name))
351
352     def test_rename(self):
353         basedir = "util/FileUtil/test_rename"
354         fileutil.make_dirs(basedir)
355         self.touch(basedir, "here")
356         fn = os.path.join(basedir, "here")
357         fn2 = os.path.join(basedir, "there")
358         fileutil.rename(fn, fn2)
359         self.failIf(os.path.exists(fn))
360         self.failUnless(os.path.exists(fn2))