]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_util.py
switch from base62 to base32 for storage indices, switch from z-base-32 to rfc 3548...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_util.py
1
2 def foo(): pass # keep the line number constant
3
4 import os
5 from twisted.trial import unittest
6 from twisted.internet import defer
7 from twisted.python import failure
8
9 from allmydata.util import base32, bencode, idlib, humanreadable, mathutil, hashutil
10 from allmydata.util import assertutil, fileutil, testutil, deferredutil
11
12
13 class Base32(unittest.TestCase):
14     def test_b2a(self):
15         self.failUnlessEqual(base32.b2a("\x12\x34"), "ci2a")
16     def test_b2a_or_none(self):
17         self.failUnlessEqual(base32.b2a_or_none(None), None)
18         self.failUnlessEqual(base32.b2a_or_none("\x12\x34"), "ci2a")
19     def test_a2b(self):
20         self.failUnlessEqual(base32.a2b("ci2a"), "\x12\x34")
21         self.failUnlessRaises(AssertionError, base32.a2b, "b0gus")
22
23 class IDLib(unittest.TestCase):
24     def test_nodeid_b2a(self):
25         self.failUnlessEqual(idlib.nodeid_b2a("\x00"*20), "a"*32)
26
27 class NoArgumentException(Exception):
28     def __init__(self):
29         pass
30
31 class HumanReadable(unittest.TestCase):
32     def test_repr(self):
33         hr = humanreadable.hr
34         self.failUnlessEqual(hr(foo), "<foo() at test_util.py:2>")
35         self.failUnlessEqual(hr(self.test_repr),
36                              "<bound method HumanReadable.test_repr of <allmydata.test.test_util.HumanReadable testMethod=test_repr>>")
37         self.failUnlessEqual(hr(1L), "1")
38         self.failUnlessEqual(hr(10**40),
39                              "100000000000000000...000000000000000000")
40         self.failUnlessEqual(hr(self), "<allmydata.test.test_util.HumanReadable testMethod=test_repr>")
41         self.failUnlessEqual(hr([1,2]), "[1, 2]")
42         self.failUnlessEqual(hr({1:2}), "{1:2}")
43         try:
44             raise RuntimeError
45         except Exception, e:
46             self.failUnless(
47                 hr(e) == "<RuntimeError: ()>" # python-2.4
48                 or hr(e) == "RuntimeError()") # python-2.5
49         try:
50             raise RuntimeError("oops")
51         except Exception, e:
52             self.failUnless(
53                 hr(e) == "<RuntimeError: 'oops'>" # python-2.4
54                 or hr(e) == "RuntimeError('oops',)") # python-2.5
55         try:
56             raise NoArgumentException
57         except Exception, e:
58             self.failUnless(
59                 hr(e) == "<NoArgumentException>" # python-2.4
60                 or hr(e) == "NoArgumentException()") # python-2.5
61
62
63 class MyList(list):
64     pass
65
66 class Bencode(unittest.TestCase):
67     def test_bencode(self):
68         e = bencode.bencode
69         self.failUnlessEqual(e(4), "i4e")
70         self.failUnlessEqual(e([1,2]), "li1ei2ee")
71         self.failUnlessEqual(e(MyList([1,2])), "li1ei2ee")
72         self.failUnlessEqual(e({1:2}), "di1ei2ee")
73         self.failUnlessEqual(e(u"a"), "u1:a")
74         self.failUnlessEqual(e([True,False]), "lb1b0e")
75         self.failUnlessEqual(e(1.5), "f1.5e")
76         self.failUnlessEqual(e("foo"), "3:foo")
77         d = bencode.bdecode
78         self.failUnlessEqual(d("li1ei2ee"), [1,2])
79         self.failUnlessEqual(d("u1:a"), u"a")
80         self.failUnlessRaises(ValueError, d, "u10:short")
81         self.failUnlessEqual(d("lb1b0e"), [True,False])
82         self.failUnlessRaises(ValueError, d, "b2")
83         self.failUnlessEqual(d("f1.5e"), 1.5)
84         self.failUnlessEqual(d("3:foo"), "foo")
85         self.failUnlessRaises(ValueError, d,
86                               "38:When doing layout, always plan ah")
87         # ooh! fascinating! bdecode requires string keys! I think this ought
88         # to be changed
89         #self.failUnlessEqual(d("di1ei2ee"), {1:2})
90         self.failUnlessEqual(d("d1:ai2eu1:bi3ee"), {"a":2, u"b":3})
91         self.failUnlessRaises(ValueError, d, "di1ei2ee")
92         self.failUnlessRaises(ValueError, d, "d1:ai1e1:ai2ee")
93
94         self.failUnlessRaises(ValueError, d, "i1ei2e")
95
96         # now run all the module's builtin tests
97         bencode.test_decode_raw_string()
98         bencode.test_encode_and_decode_unicode_results_in_unicode_type()
99         bencode.test_encode_and_decode_unicode_at_least_preserves_the_content_even_if_it_flattens_the_type()
100         bencode.test_dict_forbids_non_string_key()
101         bencode.test_dict_forbids_key_repeat()
102         bencode.test_empty_dict()
103         bencode.test_dict_allows_unicode_keys()
104         bencode.test_ValueError_in_decode_unknown()
105         bencode.test_encode_and_decode_none()
106         bencode.test_encode_and_decode_long()
107         bencode.test_encode_and_decode_int()
108         bencode.test_encode_and_decode_float()
109         bencode.test_encode_and_decode_bool()
110         #bencode.test_decode_noncanonical_int()
111         bencode.test_encode_and_decode_dict()
112         bencode.test_encode_and_decode_list()
113         bencode.test_encode_and_decode_tuple()
114         bencode.test_encode_and_decode_empty_dict()
115         bencode.test_encode_and_decode_complex_object()
116         bencode.test_unfinished_list()
117         bencode.test_unfinished_dict()
118         bencode.test_unsupported_type()
119
120 class Math(unittest.TestCase):
121     def test_div_ceil(self):
122         f = mathutil.div_ceil
123         self.failUnlessEqual(f(0, 1), 0)
124         self.failUnlessEqual(f(0, 2), 0)
125         self.failUnlessEqual(f(0, 3), 0)
126         self.failUnlessEqual(f(1, 3), 1)
127         self.failUnlessEqual(f(2, 3), 1)
128         self.failUnlessEqual(f(3, 3), 1)
129         self.failUnlessEqual(f(4, 3), 2)
130         self.failUnlessEqual(f(5, 3), 2)
131         self.failUnlessEqual(f(6, 3), 2)
132         self.failUnlessEqual(f(7, 3), 3)
133
134     def test_next_multiple(self):
135         f = mathutil.next_multiple
136         self.failUnlessEqual(f(5, 1), 5)
137         self.failUnlessEqual(f(5, 2), 6)
138         self.failUnlessEqual(f(5, 3), 6)
139         self.failUnlessEqual(f(5, 4), 8)
140         self.failUnlessEqual(f(5, 5), 5)
141         self.failUnlessEqual(f(5, 6), 6)
142         self.failUnlessEqual(f(32, 1), 32)
143         self.failUnlessEqual(f(32, 2), 32)
144         self.failUnlessEqual(f(32, 3), 33)
145         self.failUnlessEqual(f(32, 4), 32)
146         self.failUnlessEqual(f(32, 5), 35)
147         self.failUnlessEqual(f(32, 6), 36)
148         self.failUnlessEqual(f(32, 7), 35)
149         self.failUnlessEqual(f(32, 8), 32)
150         self.failUnlessEqual(f(32, 9), 36)
151         self.failUnlessEqual(f(32, 10), 40)
152         self.failUnlessEqual(f(32, 11), 33)
153         self.failUnlessEqual(f(32, 12), 36)
154         self.failUnlessEqual(f(32, 13), 39)
155         self.failUnlessEqual(f(32, 14), 42)
156         self.failUnlessEqual(f(32, 15), 45)
157         self.failUnlessEqual(f(32, 16), 32)
158         self.failUnlessEqual(f(32, 17), 34)
159         self.failUnlessEqual(f(32, 18), 36)
160         self.failUnlessEqual(f(32, 589), 589)
161
162     def test_pad_size(self):
163         f = mathutil.pad_size
164         self.failUnlessEqual(f(0, 4), 0)
165         self.failUnlessEqual(f(1, 4), 3)
166         self.failUnlessEqual(f(2, 4), 2)
167         self.failUnlessEqual(f(3, 4), 1)
168         self.failUnlessEqual(f(4, 4), 0)
169         self.failUnlessEqual(f(5, 4), 3)
170
171     def test_is_power_of_k(self):
172         f = mathutil.is_power_of_k
173         for i in range(1, 100):
174             if i in (1, 2, 4, 8, 16, 32, 64):
175                 self.failUnless(f(i, 2), "but %d *is* a power of 2" % i)
176             else:
177                 self.failIf(f(i, 2), "but %d is *not* a power of 2" % i)
178         for i in range(1, 100):
179             if i in (1, 3, 9, 27, 81):
180                 self.failUnless(f(i, 3), "but %d *is* a power of 3" % i)
181             else:
182                 self.failIf(f(i, 3), "but %d is *not* a power of 3" % i)
183
184     def test_next_power_of_k(self):
185         f = mathutil.next_power_of_k
186         self.failUnlessEqual(f(0,2), 1)
187         self.failUnlessEqual(f(1,2), 1)
188         self.failUnlessEqual(f(2,2), 2)
189         self.failUnlessEqual(f(3,2), 4)
190         self.failUnlessEqual(f(4,2), 4)
191         for i in range(5, 8): self.failUnlessEqual(f(i,2), 8, "%d" % i)
192         for i in range(9, 16): self.failUnlessEqual(f(i,2), 16, "%d" % i)
193         for i in range(17, 32): self.failUnlessEqual(f(i,2), 32, "%d" % i)
194         for i in range(33, 64): self.failUnlessEqual(f(i,2), 64, "%d" % i)
195         for i in range(65, 100): self.failUnlessEqual(f(i,2), 128, "%d" % i)
196
197         self.failUnlessEqual(f(0,3), 1)
198         self.failUnlessEqual(f(1,3), 1)
199         self.failUnlessEqual(f(2,3), 3)
200         self.failUnlessEqual(f(3,3), 3)
201         for i in range(4, 9): self.failUnlessEqual(f(i,3), 9, "%d" % i)
202         for i in range(10, 27): self.failUnlessEqual(f(i,3), 27, "%d" % i)
203         for i in range(28, 81): self.failUnlessEqual(f(i,3), 81, "%d" % i)
204         for i in range(82, 200): self.failUnlessEqual(f(i,3), 243, "%d" % i)
205
206     def test_ave(self):
207         f = mathutil.ave
208         self.failUnlessEqual(f([1,2,3]), 2)
209         self.failUnlessEqual(f([0,0,0,4]), 1)
210         self.failUnlessAlmostEqual(f([0.0, 1.0, 1.0]), .666666666666)
211
212
213 class Asserts(unittest.TestCase):
214     def should_assert(self, func, *args, **kwargs):
215         try:
216             func(*args, **kwargs)
217         except AssertionError, e:
218             return str(e)
219         except Exception, e:
220             self.fail("assert failed with non-AssertionError: %s" % e)
221         self.fail("assert was not caught")
222
223     def should_not_assert(self, func, *args, **kwargs):
224         if "re" in kwargs:
225             regexp = kwargs["re"]
226             del kwargs["re"]
227         try:
228             func(*args, **kwargs)
229         except AssertionError, e:
230             self.fail("assertion fired when it should not have: %s" % e)
231         except Exception, e:
232             self.fail("assertion (which shouldn't have failed) failed with non-AssertionError: %s" % e)
233         return # we're happy
234
235
236     def test_assert(self):
237         f = assertutil._assert
238         self.should_assert(f)
239         self.should_assert(f, False)
240         self.should_not_assert(f, True)
241
242         m = self.should_assert(f, False, "message")
243         self.failUnlessEqual(m, "'message' <type 'str'>", m)
244         m = self.should_assert(f, False, "message1", othermsg=12)
245         self.failUnlessEqual("'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
246         m = self.should_assert(f, False, othermsg="message2")
247         self.failUnlessEqual("othermsg: 'message2' <type 'str'>", m)
248
249     def test_precondition(self):
250         f = assertutil.precondition
251         self.should_assert(f)
252         self.should_assert(f, False)
253         self.should_not_assert(f, True)
254
255         m = self.should_assert(f, False, "message")
256         self.failUnlessEqual("precondition: 'message' <type 'str'>", m)
257         m = self.should_assert(f, False, "message1", othermsg=12)
258         self.failUnlessEqual("precondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
259         m = self.should_assert(f, False, othermsg="message2")
260         self.failUnlessEqual("precondition: othermsg: 'message2' <type 'str'>", m)
261
262     def test_postcondition(self):
263         f = assertutil.postcondition
264         self.should_assert(f)
265         self.should_assert(f, False)
266         self.should_not_assert(f, True)
267
268         m = self.should_assert(f, False, "message")
269         self.failUnlessEqual("postcondition: 'message' <type 'str'>", m)
270         m = self.should_assert(f, False, "message1", othermsg=12)
271         self.failUnlessEqual("postcondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
272         m = self.should_assert(f, False, othermsg="message2")
273         self.failUnlessEqual("postcondition: othermsg: 'message2' <type 'str'>", m)
274
275 class FileUtil(unittest.TestCase):
276     def mkdir(self, basedir, path, mode=0777):
277         fn = os.path.join(basedir, path)
278         fileutil.make_dirs(fn, mode)
279
280     def touch(self, basedir, path, mode=None, data="touch\n"):
281         fn = os.path.join(basedir, path)
282         f = open(fn, "w")
283         f.write(data)
284         f.close()
285         if mode is not None:
286             os.chmod(fn, mode)
287
288     def test_rm_dir(self):
289         basedir = "util/FileUtil/test_rm_dir"
290         fileutil.make_dirs(basedir)
291         # create it again to test idempotency
292         fileutil.make_dirs(basedir)
293         d = os.path.join(basedir, "doomed")
294         self.mkdir(d, "a/b")
295         self.touch(d, "a/b/1.txt")
296         self.touch(d, "a/b/2.txt", 0444)
297         self.touch(d, "a/b/3.txt", 0)
298         self.mkdir(d, "a/c")
299         self.touch(d, "a/c/1.txt")
300         self.touch(d, "a/c/2.txt", 0444)
301         self.touch(d, "a/c/3.txt", 0)
302         os.chmod(os.path.join(d, "a/c"), 0444)
303         self.mkdir(d, "a/d")
304         self.touch(d, "a/d/1.txt")
305         self.touch(d, "a/d/2.txt", 0444)
306         self.touch(d, "a/d/3.txt", 0)
307         os.chmod(os.path.join(d, "a/d"), 0)
308
309         fileutil.rm_dir(d)
310         self.failIf(os.path.exists(d))
311         # remove it again to test idempotency
312         fileutil.rm_dir(d)
313
314     def test_remove_if_possible(self):
315         basedir = "util/FileUtil/test_remove_if_possible"
316         fileutil.make_dirs(basedir)
317         self.touch(basedir, "here")
318         fn = os.path.join(basedir, "here")
319         fileutil.remove_if_possible(fn)
320         self.failIf(os.path.exists(fn))
321         fileutil.remove_if_possible(fn) # should be idempotent
322         fileutil.rm_dir(basedir)
323         fileutil.remove_if_possible(fn) # should survive errors
324
325     def test_open_or_create(self):
326         basedir = "util/FileUtil/test_open_or_create"
327         fileutil.make_dirs(basedir)
328         fn = os.path.join(basedir, "here")
329         f = fileutil.open_or_create(fn)
330         f.write("stuff.")
331         f.close()
332         f = fileutil.open_or_create(fn)
333         f.seek(0, 2)
334         f.write("more.")
335         f.close()
336         f = open(fn, "r")
337         data = f.read()
338         f.close()
339         self.failUnlessEqual(data, "stuff.more.")
340
341     def test_NamedTemporaryDirectory(self):
342         basedir = "util/FileUtil/test_NamedTemporaryDirectory"
343         fileutil.make_dirs(basedir)
344         td = fileutil.NamedTemporaryDirectory(dir=basedir)
345         name = td.name
346         self.failUnless(basedir in name)
347         self.failUnless(basedir in repr(td))
348         self.failUnless(os.path.isdir(name))
349         del td
350         # it is conceivable that we need to force gc here, but I'm not sure
351         self.failIf(os.path.isdir(name))
352
353     def test_rename(self):
354         basedir = "util/FileUtil/test_rename"
355         fileutil.make_dirs(basedir)
356         self.touch(basedir, "here")
357         fn = os.path.join(basedir, "here")
358         fn2 = os.path.join(basedir, "there")
359         fileutil.rename(fn, fn2)
360         self.failIf(os.path.exists(fn))
361         self.failUnless(os.path.exists(fn2))
362
363     def test_du(self):
364         basedir = "util/FileUtil/test_du"
365         fileutil.make_dirs(basedir)
366         d = os.path.join(basedir, "space-consuming")
367         self.mkdir(d, "a/b")
368         self.touch(d, "a/b/1.txt", data="a"*10)
369         self.touch(d, "a/b/2.txt", data="b"*11)
370         self.mkdir(d, "a/c")
371         self.touch(d, "a/c/1.txt", data="c"*12)
372         self.touch(d, "a/c/2.txt", data="d"*13)
373
374         used = fileutil.du(basedir)
375         self.failUnlessEqual(10+11+12+13, used)
376
377 class PollMixinTests(unittest.TestCase):
378     def setUp(self):
379         self.pm = testutil.PollMixin()
380
381     def test_PollMixin_True(self):
382         d = self.pm.poll(check_f=lambda : True,
383                          pollinterval=0.1)
384         return d
385
386     def test_PollMixin_False_then_True(self):
387         i = iter([False, True])
388         d = self.pm.poll(check_f=i.next,
389                          pollinterval=0.1)
390         return d
391
392     def test_timeout(self):
393         d = self.pm.poll(check_f=lambda: False,
394                          pollinterval=0.01,
395                          timeout=1)
396         def _suc(res):
397             self.fail("poll should have failed, not returned %s" % (res,))
398         def _err(f):
399             f.trap(testutil.TimeoutError)
400             return None # success
401         d.addCallbacks(_suc, _err)
402         return d
403
404 class DeferredUtilTests(unittest.TestCase):
405     def test_success(self):
406         d1, d2 = defer.Deferred(), defer.Deferred()
407         good = []
408         bad = []
409         dlss = deferredutil.DeferredListShouldSucceed([d1,d2])
410         dlss.addCallbacks(good.append, bad.append)
411         d1.callback(1)
412         d2.callback(2)
413         self.failUnlessEqual(good, [[1,2]])
414         self.failUnlessEqual(bad, [])
415
416     def test_failure(self):
417         d1, d2 = defer.Deferred(), defer.Deferred()
418         good = []
419         bad = []
420         dlss = deferredutil.DeferredListShouldSucceed([d1,d2])
421         dlss.addCallbacks(good.append, bad.append)
422         d1.addErrback(lambda _ignore: None)
423         d2.addErrback(lambda _ignore: None)
424         d1.callback(1)
425         d2.errback(RuntimeError())
426         self.failUnlessEqual(good, [])
427         self.failUnlessEqual(len(bad), 1)
428         f = bad[0]
429         self.failUnless(isinstance(f, failure.Failure))
430         self.failUnless(f.check(RuntimeError))
431
432 class HashUtilTests(unittest.TestCase):
433     def test_sha256d(self):
434         h1 = hashutil.tagged_hash_256d("tag1", "value")
435         h2 = hashutil.tagged_hasher_256d("tag1")
436         h2.update("value")
437         h2 = h2.digest()
438         self.failUnlessEqual(h1, h2)
439
440     def test_sha256d_truncated(self):
441         h1 = hashutil.tagged_hash_256d("tag1", "value", 16)
442         h2 = hashutil.tagged_hasher_256d("tag1", 16)
443         h2.update("value")
444         h2 = h2.digest()
445         self.failUnlessEqual(len(h1), 16)
446         self.failUnlessEqual(len(h2), 16)
447         self.failUnlessEqual(h1, h2)
448
449     def test_chk(self):
450         h1 = hashutil.content_hash_key_hash(3, 10, 1000, "data")
451         h2 = hashutil.content_hash_key_hasher(3, 10, 1000)
452         h2.update("data")
453         h2 = h2.digest()
454         self.failUnlessEqual(h1, h2)