]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_util.py
Statistics module
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_util.py
1
2 def foo(): pass # keep the line number constant
3
4 import os, time, random
5 from twisted.trial import unittest
6 from twisted.internet import defer, reactor
7 from twisted.python import failure
8
9 from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil
10 from allmydata.util import assertutil, fileutil, deferredutil, abbreviate
11 from allmydata.util import limiter, time_format, pollmixin, cachedir
12 from allmydata.util import statistics
13
14 class Base32(unittest.TestCase):
15     def test_b2a_matches_Pythons(self):
16         import base64
17         y = "\x12\x34\x45\x67\x89\x0a\xbc\xde\xf0"
18         x = base64.b32encode(y)
19         while x and x[-1] == '=':
20             x = x[:-1]
21         x = x.lower()
22         self.failUnlessEqual(base32.b2a(y), x)
23     def test_b2a(self):
24         self.failUnlessEqual(base32.b2a("\x12\x34"), "ci2a")
25     def test_b2a_or_none(self):
26         self.failUnlessEqual(base32.b2a_or_none(None), None)
27         self.failUnlessEqual(base32.b2a_or_none("\x12\x34"), "ci2a")
28     def test_a2b(self):
29         self.failUnlessEqual(base32.a2b("ci2a"), "\x12\x34")
30         self.failUnlessRaises(AssertionError, base32.a2b, "b0gus")
31
32 class IDLib(unittest.TestCase):
33     def test_nodeid_b2a(self):
34         self.failUnlessEqual(idlib.nodeid_b2a("\x00"*20), "a"*32)
35
36 class NoArgumentException(Exception):
37     def __init__(self):
38         pass
39
40 class HumanReadable(unittest.TestCase):
41     def test_repr(self):
42         hr = humanreadable.hr
43         self.failUnlessEqual(hr(foo), "<foo() at test_util.py:2>")
44         self.failUnlessEqual(hr(self.test_repr),
45                              "<bound method HumanReadable.test_repr of <allmydata.test.test_util.HumanReadable testMethod=test_repr>>")
46         self.failUnlessEqual(hr(1L), "1")
47         self.failUnlessEqual(hr(10**40),
48                              "100000000000000000...000000000000000000")
49         self.failUnlessEqual(hr(self), "<allmydata.test.test_util.HumanReadable testMethod=test_repr>")
50         self.failUnlessEqual(hr([1,2]), "[1, 2]")
51         self.failUnlessEqual(hr({1:2}), "{1:2}")
52         try:
53             raise RuntimeError
54         except Exception, e:
55             self.failUnless(
56                 hr(e) == "<RuntimeError: ()>" # python-2.4
57                 or hr(e) == "RuntimeError()") # python-2.5
58         try:
59             raise RuntimeError("oops")
60         except Exception, e:
61             self.failUnless(
62                 hr(e) == "<RuntimeError: 'oops'>" # python-2.4
63                 or hr(e) == "RuntimeError('oops',)") # python-2.5
64         try:
65             raise NoArgumentException
66         except Exception, e:
67             self.failUnless(
68                 hr(e) == "<NoArgumentException>" # python-2.4
69                 or hr(e) == "NoArgumentException()") # python-2.5
70
71
72 class MyList(list):
73     pass
74
75 class Math(unittest.TestCase):
76     def test_div_ceil(self):
77         f = mathutil.div_ceil
78         self.failUnlessEqual(f(0, 1), 0)
79         self.failUnlessEqual(f(0, 2), 0)
80         self.failUnlessEqual(f(0, 3), 0)
81         self.failUnlessEqual(f(1, 3), 1)
82         self.failUnlessEqual(f(2, 3), 1)
83         self.failUnlessEqual(f(3, 3), 1)
84         self.failUnlessEqual(f(4, 3), 2)
85         self.failUnlessEqual(f(5, 3), 2)
86         self.failUnlessEqual(f(6, 3), 2)
87         self.failUnlessEqual(f(7, 3), 3)
88
89     def test_next_multiple(self):
90         f = mathutil.next_multiple
91         self.failUnlessEqual(f(5, 1), 5)
92         self.failUnlessEqual(f(5, 2), 6)
93         self.failUnlessEqual(f(5, 3), 6)
94         self.failUnlessEqual(f(5, 4), 8)
95         self.failUnlessEqual(f(5, 5), 5)
96         self.failUnlessEqual(f(5, 6), 6)
97         self.failUnlessEqual(f(32, 1), 32)
98         self.failUnlessEqual(f(32, 2), 32)
99         self.failUnlessEqual(f(32, 3), 33)
100         self.failUnlessEqual(f(32, 4), 32)
101         self.failUnlessEqual(f(32, 5), 35)
102         self.failUnlessEqual(f(32, 6), 36)
103         self.failUnlessEqual(f(32, 7), 35)
104         self.failUnlessEqual(f(32, 8), 32)
105         self.failUnlessEqual(f(32, 9), 36)
106         self.failUnlessEqual(f(32, 10), 40)
107         self.failUnlessEqual(f(32, 11), 33)
108         self.failUnlessEqual(f(32, 12), 36)
109         self.failUnlessEqual(f(32, 13), 39)
110         self.failUnlessEqual(f(32, 14), 42)
111         self.failUnlessEqual(f(32, 15), 45)
112         self.failUnlessEqual(f(32, 16), 32)
113         self.failUnlessEqual(f(32, 17), 34)
114         self.failUnlessEqual(f(32, 18), 36)
115         self.failUnlessEqual(f(32, 589), 589)
116
117     def test_pad_size(self):
118         f = mathutil.pad_size
119         self.failUnlessEqual(f(0, 4), 0)
120         self.failUnlessEqual(f(1, 4), 3)
121         self.failUnlessEqual(f(2, 4), 2)
122         self.failUnlessEqual(f(3, 4), 1)
123         self.failUnlessEqual(f(4, 4), 0)
124         self.failUnlessEqual(f(5, 4), 3)
125
126     def test_is_power_of_k(self):
127         f = mathutil.is_power_of_k
128         for i in range(1, 100):
129             if i in (1, 2, 4, 8, 16, 32, 64):
130                 self.failUnless(f(i, 2), "but %d *is* a power of 2" % i)
131             else:
132                 self.failIf(f(i, 2), "but %d is *not* a power of 2" % i)
133         for i in range(1, 100):
134             if i in (1, 3, 9, 27, 81):
135                 self.failUnless(f(i, 3), "but %d *is* a power of 3" % i)
136             else:
137                 self.failIf(f(i, 3), "but %d is *not* a power of 3" % i)
138
139     def test_next_power_of_k(self):
140         f = mathutil.next_power_of_k
141         self.failUnlessEqual(f(0,2), 1)
142         self.failUnlessEqual(f(1,2), 1)
143         self.failUnlessEqual(f(2,2), 2)
144         self.failUnlessEqual(f(3,2), 4)
145         self.failUnlessEqual(f(4,2), 4)
146         for i in range(5, 8): self.failUnlessEqual(f(i,2), 8, "%d" % i)
147         for i in range(9, 16): self.failUnlessEqual(f(i,2), 16, "%d" % i)
148         for i in range(17, 32): self.failUnlessEqual(f(i,2), 32, "%d" % i)
149         for i in range(33, 64): self.failUnlessEqual(f(i,2), 64, "%d" % i)
150         for i in range(65, 100): self.failUnlessEqual(f(i,2), 128, "%d" % i)
151
152         self.failUnlessEqual(f(0,3), 1)
153         self.failUnlessEqual(f(1,3), 1)
154         self.failUnlessEqual(f(2,3), 3)
155         self.failUnlessEqual(f(3,3), 3)
156         for i in range(4, 9): self.failUnlessEqual(f(i,3), 9, "%d" % i)
157         for i in range(10, 27): self.failUnlessEqual(f(i,3), 27, "%d" % i)
158         for i in range(28, 81): self.failUnlessEqual(f(i,3), 81, "%d" % i)
159         for i in range(82, 200): self.failUnlessEqual(f(i,3), 243, "%d" % i)
160
161     def test_ave(self):
162         f = mathutil.ave
163         self.failUnlessEqual(f([1,2,3]), 2)
164         self.failUnlessEqual(f([0,0,0,4]), 1)
165         self.failUnlessAlmostEqual(f([0.0, 1.0, 1.0]), .666666666666)
166
167     def test_round_sigfigs(self):
168         f = mathutil.round_sigfigs
169         self.failUnlessEqual(f(22.0/3, 4), 7.3330000000000002)
170
171 class Statistics(unittest.TestCase):
172     def should_assert(self, msg, func, *args, **kwargs):
173         try:
174             func(*args, **kwargs)
175             self.fail(msg)
176         except AssertionError, e:
177             pass
178
179     def failUnlessListEqual(self, a, b, msg = None):
180         self.failUnlessEqual(len(a), len(b))
181         for i in range(len(a)):
182             self.failUnlessEqual(a[i], b[i], msg)
183
184     def failUnlessListAlmostEqual(self, a, b, places = 7, msg = None):
185         self.failUnlessEqual(len(a), len(b))
186         for i in range(len(a)):
187             self.failUnlessAlmostEqual(a[i], b[i], places, msg)
188
189     def test_binomial_coeff(self):
190         f = statistics.binomial_coeff
191         self.failUnlessEqual(f(20, 0), 1)
192         self.failUnlessEqual(f(20, 1), 20)
193         self.failUnlessEqual(f(20, 2), 190)
194         self.failUnlessEqual(f(20, 8), f(20, 12))
195         self.should_assert("Should assert if n < k", f, 2, 3)
196
197     def test_binomial_distribution_pmf(self):
198         f = statistics.binomial_distribution_pmf
199
200         pmf_comp = f(2, .1)
201         pmf_stat = [0.81, 0.18, 0.01]
202         self.failUnlessListAlmostEqual(pmf_comp, pmf_stat)
203         
204         # Summing across a PMF should give the total probability 1
205         self.failUnlessAlmostEqual(sum(pmf_comp), 1)
206         self.should_assert("Should assert if not 0<=p<=1", f, 1, -1)
207         self.should_assert("Should assert if n < 1", f, 0, .1)
208
209     def test_survival_pmf(self):
210         f = statistics.survival_pmf
211         # Cross-check binomial-distribution method against convolution
212         # method.
213         p_list = [.9999] * 100 + [.99] * 50 + [.8] * 20
214         pmf1 = statistics.survival_pmf_via_conv(p_list)
215         pmf2 = statistics.survival_pmf_via_bd(p_list)
216         self.failUnlessListAlmostEqual(pmf1, pmf2)
217         self.failUnlessTrue(statistics.valid_pmf(pmf1))
218         self.should_assert("Should assert if p_i > 1", f, [1.1]);
219         self.should_assert("Should assert if p_i < 0", f, [-.1]);
220         
221
222     def test_convolve(self):
223         f = statistics.convolve
224         v1 = [ 1, 2, 3 ]
225         v2 = [ 4, 5, 6 ]
226         v3 = [ 7, 8 ]
227         v1v2result = [ 4, 13, 28, 27, 18 ]
228         # Convolution is commutative
229         r1 = f(v1, v2)
230         r2 = f(v2, v1)
231         self.failUnlessListEqual(r1, r2, "Convolution should be commutative")
232         self.failUnlessListEqual(r1, v1v2result, "Didn't match known result")
233         # Convolution is associative
234         r1 = f(f(v1, v2), v3)
235         r2 = f(v1, f(v2, v3))
236         self.failUnlessListEqual(r1, r2, "Convolution should be associative")
237         # Convolution is distributive
238         r1 = f(v3, [ a + b for a, b in zip(v1, v2) ])
239         tmp1 = f(v3, v1)
240         tmp2 = f(v3, v2)
241         r2 = [ a + b for a, b in zip(tmp1, tmp2) ]
242         self.failUnlessListEqual(r1, r2, "Convolution should be distributive")
243         # Convolution is scalar multiplication associative
244         tmp1 = f(v1, v2)
245         r1 = [ a * 4 for a in tmp1 ]
246         tmp2 = [ a * 4 for a in v1 ]
247         r2 = f(tmp2, v2)
248         self.failUnlessListEqual(r1, r2, "Convolution should be scalar multiplication associative")
249
250     def test_find_k(self):
251         f = statistics.find_k
252         g = statistics.pr_file_loss
253         plist = [.9] * 10 + [.8] * 10
254         t = .0001
255         k = f(plist, t)
256         self.failUnlessEqual(k, 10)
257         self.failUnless(g(plist, k) < t)
258
259     def test_pr_file_loss(self):
260         f = statistics.pr_file_loss
261         plist = [.5] * 10
262         self.failUnlessEqual(f(plist, 3), .0546875)
263
264     def test_pr_backup_file_loss(self):
265         f = statistics.pr_backup_file_loss
266         plist = [.5] * 10
267         self.failUnlessEqual(f(plist, .5, 3), .02734375)
268
269
270 class Asserts(unittest.TestCase):
271     def should_assert(self, func, *args, **kwargs):
272         try:
273             func(*args, **kwargs)
274         except AssertionError, e:
275             return str(e)
276         except Exception, e:
277             self.fail("assert failed with non-AssertionError: %s" % e)
278         self.fail("assert was not caught")
279
280     def should_not_assert(self, func, *args, **kwargs):
281         if "re" in kwargs:
282             regexp = kwargs["re"]
283             del kwargs["re"]
284         try:
285             func(*args, **kwargs)
286         except AssertionError, e:
287             self.fail("assertion fired when it should not have: %s" % e)
288         except Exception, e:
289             self.fail("assertion (which shouldn't have failed) failed with non-AssertionError: %s" % e)
290         return # we're happy
291
292
293     def test_assert(self):
294         f = assertutil._assert
295         self.should_assert(f)
296         self.should_assert(f, False)
297         self.should_not_assert(f, True)
298
299         m = self.should_assert(f, False, "message")
300         self.failUnlessEqual(m, "'message' <type 'str'>", m)
301         m = self.should_assert(f, False, "message1", othermsg=12)
302         self.failUnlessEqual("'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
303         m = self.should_assert(f, False, othermsg="message2")
304         self.failUnlessEqual("othermsg: 'message2' <type 'str'>", m)
305
306     def test_precondition(self):
307         f = assertutil.precondition
308         self.should_assert(f)
309         self.should_assert(f, False)
310         self.should_not_assert(f, True)
311
312         m = self.should_assert(f, False, "message")
313         self.failUnlessEqual("precondition: 'message' <type 'str'>", m)
314         m = self.should_assert(f, False, "message1", othermsg=12)
315         self.failUnlessEqual("precondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
316         m = self.should_assert(f, False, othermsg="message2")
317         self.failUnlessEqual("precondition: othermsg: 'message2' <type 'str'>", m)
318
319     def test_postcondition(self):
320         f = assertutil.postcondition
321         self.should_assert(f)
322         self.should_assert(f, False)
323         self.should_not_assert(f, True)
324
325         m = self.should_assert(f, False, "message")
326         self.failUnlessEqual("postcondition: 'message' <type 'str'>", m)
327         m = self.should_assert(f, False, "message1", othermsg=12)
328         self.failUnlessEqual("postcondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
329         m = self.should_assert(f, False, othermsg="message2")
330         self.failUnlessEqual("postcondition: othermsg: 'message2' <type 'str'>", m)
331
332 class FileUtil(unittest.TestCase):
333     def mkdir(self, basedir, path, mode=0777):
334         fn = os.path.join(basedir, path)
335         fileutil.make_dirs(fn, mode)
336
337     def touch(self, basedir, path, mode=None, data="touch\n"):
338         fn = os.path.join(basedir, path)
339         f = open(fn, "w")
340         f.write(data)
341         f.close()
342         if mode is not None:
343             os.chmod(fn, mode)
344
345     def test_rm_dir(self):
346         basedir = "util/FileUtil/test_rm_dir"
347         fileutil.make_dirs(basedir)
348         # create it again to test idempotency
349         fileutil.make_dirs(basedir)
350         d = os.path.join(basedir, "doomed")
351         self.mkdir(d, "a/b")
352         self.touch(d, "a/b/1.txt")
353         self.touch(d, "a/b/2.txt", 0444)
354         self.touch(d, "a/b/3.txt", 0)
355         self.mkdir(d, "a/c")
356         self.touch(d, "a/c/1.txt")
357         self.touch(d, "a/c/2.txt", 0444)
358         self.touch(d, "a/c/3.txt", 0)
359         os.chmod(os.path.join(d, "a/c"), 0444)
360         self.mkdir(d, "a/d")
361         self.touch(d, "a/d/1.txt")
362         self.touch(d, "a/d/2.txt", 0444)
363         self.touch(d, "a/d/3.txt", 0)
364         os.chmod(os.path.join(d, "a/d"), 0)
365
366         fileutil.rm_dir(d)
367         self.failIf(os.path.exists(d))
368         # remove it again to test idempotency
369         fileutil.rm_dir(d)
370
371     def test_remove_if_possible(self):
372         basedir = "util/FileUtil/test_remove_if_possible"
373         fileutil.make_dirs(basedir)
374         self.touch(basedir, "here")
375         fn = os.path.join(basedir, "here")
376         fileutil.remove_if_possible(fn)
377         self.failIf(os.path.exists(fn))
378         fileutil.remove_if_possible(fn) # should be idempotent
379         fileutil.rm_dir(basedir)
380         fileutil.remove_if_possible(fn) # should survive errors
381
382     def test_open_or_create(self):
383         basedir = "util/FileUtil/test_open_or_create"
384         fileutil.make_dirs(basedir)
385         fn = os.path.join(basedir, "here")
386         f = fileutil.open_or_create(fn)
387         f.write("stuff.")
388         f.close()
389         f = fileutil.open_or_create(fn)
390         f.seek(0, 2)
391         f.write("more.")
392         f.close()
393         f = open(fn, "r")
394         data = f.read()
395         f.close()
396         self.failUnlessEqual(data, "stuff.more.")
397
398     def test_NamedTemporaryDirectory(self):
399         basedir = "util/FileUtil/test_NamedTemporaryDirectory"
400         fileutil.make_dirs(basedir)
401         td = fileutil.NamedTemporaryDirectory(dir=basedir)
402         name = td.name
403         self.failUnless(basedir in name)
404         self.failUnless(basedir in repr(td))
405         self.failUnless(os.path.isdir(name))
406         del td
407         # it is conceivable that we need to force gc here, but I'm not sure
408         self.failIf(os.path.isdir(name))
409
410     def test_rename(self):
411         basedir = "util/FileUtil/test_rename"
412         fileutil.make_dirs(basedir)
413         self.touch(basedir, "here")
414         fn = os.path.join(basedir, "here")
415         fn2 = os.path.join(basedir, "there")
416         fileutil.rename(fn, fn2)
417         self.failIf(os.path.exists(fn))
418         self.failUnless(os.path.exists(fn2))
419
420     def test_du(self):
421         basedir = "util/FileUtil/test_du"
422         fileutil.make_dirs(basedir)
423         d = os.path.join(basedir, "space-consuming")
424         self.mkdir(d, "a/b")
425         self.touch(d, "a/b/1.txt", data="a"*10)
426         self.touch(d, "a/b/2.txt", data="b"*11)
427         self.mkdir(d, "a/c")
428         self.touch(d, "a/c/1.txt", data="c"*12)
429         self.touch(d, "a/c/2.txt", data="d"*13)
430
431         used = fileutil.du(basedir)
432         self.failUnlessEqual(10+11+12+13, used)
433
434 class PollMixinTests(unittest.TestCase):
435     def setUp(self):
436         self.pm = pollmixin.PollMixin()
437
438     def test_PollMixin_True(self):
439         d = self.pm.poll(check_f=lambda : True,
440                          pollinterval=0.1)
441         return d
442
443     def test_PollMixin_False_then_True(self):
444         i = iter([False, True])
445         d = self.pm.poll(check_f=i.next,
446                          pollinterval=0.1)
447         return d
448
449     def test_timeout(self):
450         d = self.pm.poll(check_f=lambda: False,
451                          pollinterval=0.01,
452                          timeout=1)
453         def _suc(res):
454             self.fail("poll should have failed, not returned %s" % (res,))
455         def _err(f):
456             f.trap(pollmixin.TimeoutError)
457             return None # success
458         d.addCallbacks(_suc, _err)
459         return d
460
461 class DeferredUtilTests(unittest.TestCase):
462     def test_gather_results(self):
463         d1 = defer.Deferred()
464         d2 = defer.Deferred()
465         res = deferredutil.gatherResults([d1, d2])
466         d1.errback(ValueError("BAD"))
467         def _callb(res):
468             self.fail("Should have errbacked, not resulted in %s" % (res,))
469         def _errb(thef):
470             thef.trap(ValueError)
471         res.addCallbacks(_callb, _errb)
472         return res
473
474     def test_success(self):
475         d1, d2 = defer.Deferred(), defer.Deferred()
476         good = []
477         bad = []
478         dlss = deferredutil.DeferredListShouldSucceed([d1,d2])
479         dlss.addCallbacks(good.append, bad.append)
480         d1.callback(1)
481         d2.callback(2)
482         self.failUnlessEqual(good, [[1,2]])
483         self.failUnlessEqual(bad, [])
484
485     def test_failure(self):
486         d1, d2 = defer.Deferred(), defer.Deferred()
487         good = []
488         bad = []
489         dlss = deferredutil.DeferredListShouldSucceed([d1,d2])
490         dlss.addCallbacks(good.append, bad.append)
491         d1.addErrback(lambda _ignore: None)
492         d2.addErrback(lambda _ignore: None)
493         d1.callback(1)
494         d2.errback(RuntimeError())
495         self.failUnlessEqual(good, [])
496         self.failUnlessEqual(len(bad), 1)
497         f = bad[0]
498         self.failUnless(isinstance(f, failure.Failure))
499         self.failUnless(f.check(RuntimeError))
500
501 class HashUtilTests(unittest.TestCase):
502
503     def test_random_key(self):
504         k = hashutil.random_key()
505         self.failUnlessEqual(len(k), hashutil.KEYLEN)
506
507     def test_sha256d(self):
508         h1 = hashutil.tagged_hash("tag1", "value")
509         h2 = hashutil.tagged_hasher("tag1")
510         h2.update("value")
511         h2a = h2.digest()
512         h2b = h2.digest()
513         self.failUnlessEqual(h1, h2a)
514         self.failUnlessEqual(h2a, h2b)
515
516     def test_sha256d_truncated(self):
517         h1 = hashutil.tagged_hash("tag1", "value", 16)
518         h2 = hashutil.tagged_hasher("tag1", 16)
519         h2.update("value")
520         h2 = h2.digest()
521         self.failUnlessEqual(len(h1), 16)
522         self.failUnlessEqual(len(h2), 16)
523         self.failUnlessEqual(h1, h2)
524
525     def test_chk(self):
526         h1 = hashutil.convergence_hash(3, 10, 1000, "data", "secret")
527         h2 = hashutil.convergence_hasher(3, 10, 1000, "secret")
528         h2.update("data")
529         h2 = h2.digest()
530         self.failUnlessEqual(h1, h2)
531
532     def test_hashers(self):
533         h1 = hashutil.block_hash("foo")
534         h2 = hashutil.block_hasher()
535         h2.update("foo")
536         self.failUnlessEqual(h1, h2.digest())
537
538         h1 = hashutil.uri_extension_hash("foo")
539         h2 = hashutil.uri_extension_hasher()
540         h2.update("foo")
541         self.failUnlessEqual(h1, h2.digest())
542
543         h1 = hashutil.plaintext_hash("foo")
544         h2 = hashutil.plaintext_hasher()
545         h2.update("foo")
546         self.failUnlessEqual(h1, h2.digest())
547
548         h1 = hashutil.crypttext_hash("foo")
549         h2 = hashutil.crypttext_hasher()
550         h2.update("foo")
551         self.failUnlessEqual(h1, h2.digest())
552
553         h1 = hashutil.crypttext_segment_hash("foo")
554         h2 = hashutil.crypttext_segment_hasher()
555         h2.update("foo")
556         self.failUnlessEqual(h1, h2.digest())
557
558         h1 = hashutil.plaintext_segment_hash("foo")
559         h2 = hashutil.plaintext_segment_hasher()
560         h2.update("foo")
561         self.failUnlessEqual(h1, h2.digest())
562
563 class Abbreviate(unittest.TestCase):
564     def test_time(self):
565         a = abbreviate.abbreviate_time
566         self.failUnlessEqual(a(None), "unknown")
567         self.failUnlessEqual(a(0), "0 seconds")
568         self.failUnlessEqual(a(1), "1 second")
569         self.failUnlessEqual(a(2), "2 seconds")
570         self.failUnlessEqual(a(119), "119 seconds")
571         MIN = 60
572         self.failUnlessEqual(a(2*MIN), "2 minutes")
573         self.failUnlessEqual(a(60*MIN), "60 minutes")
574         self.failUnlessEqual(a(179*MIN), "179 minutes")
575         HOUR = 60*MIN
576         self.failUnlessEqual(a(180*MIN), "3 hours")
577         self.failUnlessEqual(a(4*HOUR), "4 hours")
578         DAY = 24*HOUR
579         MONTH = 30*DAY
580         self.failUnlessEqual(a(2*DAY), "2 days")
581         self.failUnlessEqual(a(2*MONTH), "2 months")
582         YEAR = 365*DAY
583         self.failUnlessEqual(a(5*YEAR), "5 years")
584
585     def test_space(self):
586         tests_si = [(None, "unknown"),
587                     (0, "0 B"),
588                     (1, "1 B"),
589                     (999, "999 B"),
590                     (1000, "1000 B"),
591                     (1023, "1023 B"),
592                     (1024, "1.02 kB"),
593                     (20*1000, "20.00 kB"),
594                     (1024*1024, "1.05 MB"),
595                     (1000*1000, "1.00 MB"),
596                     (1000*1000*1000, "1.00 GB"),
597                     (1000*1000*1000*1000, "1.00 TB"),
598                     (1000*1000*1000*1000*1000, "1.00 PB"),
599                     (1234567890123456, "1.23 PB"),
600                     ]
601         for (x, expected) in tests_si:
602             got = abbreviate.abbreviate_space(x, SI=True)
603             self.failUnlessEqual(got, expected)
604
605         tests_base1024 = [(None, "unknown"),
606                           (0, "0 B"),
607                           (1, "1 B"),
608                           (999, "999 B"),
609                           (1000, "1000 B"),
610                           (1023, "1023 B"),
611                           (1024, "1.00 kiB"),
612                           (20*1024, "20.00 kiB"),
613                           (1000*1000, "976.56 kiB"),
614                           (1024*1024, "1.00 MiB"),
615                           (1024*1024*1024, "1.00 GiB"),
616                           (1024*1024*1024*1024, "1.00 TiB"),
617                           (1000*1000*1000*1000*1000, "909.49 TiB"),
618                           (1024*1024*1024*1024*1024, "1.00 PiB"),
619                           (1234567890123456, "1.10 PiB"),
620                     ]
621         for (x, expected) in tests_base1024:
622             got = abbreviate.abbreviate_space(x, SI=False)
623             self.failUnlessEqual(got, expected)
624
625         self.failUnlessEqual(abbreviate.abbreviate_space_both(1234567),
626                              "(1.23 MB, 1.18 MiB)")
627
628     def test_parse_space(self):
629         p = abbreviate.parse_abbreviated_size
630         self.failUnlessEqual(p(""), None)
631         self.failUnlessEqual(p(None), None)
632         self.failUnlessEqual(p("123"), 123)
633         self.failUnlessEqual(p("123B"), 123)
634         self.failUnlessEqual(p("2K"), 2000)
635         self.failUnlessEqual(p("2kb"), 2000)
636         self.failUnlessEqual(p("2KiB"), 2048)
637         self.failUnlessEqual(p("10MB"), 10*1000*1000)
638         self.failUnlessEqual(p("10MiB"), 10*1024*1024)
639         self.failUnlessEqual(p("5G"), 5*1000*1000*1000)
640         self.failUnlessEqual(p("4GiB"), 4*1024*1024*1024)
641         e = self.failUnlessRaises(ValueError, p, "12 cubits")
642         self.failUnless("12 cubits" in str(e))
643
644 class Limiter(unittest.TestCase):
645     def job(self, i, foo):
646         self.calls.append( (i, foo) )
647         self.simultaneous += 1
648         self.peak_simultaneous = max(self.simultaneous, self.peak_simultaneous)
649         d = defer.Deferred()
650         def _done():
651             self.simultaneous -= 1
652             d.callback("done %d" % i)
653         reactor.callLater(1.0, _done)
654         return d
655
656     def bad_job(self, i, foo):
657         raise RuntimeError("bad_job %d" % i)
658
659     def test_limiter(self):
660         self.calls = []
661         self.simultaneous = 0
662         self.peak_simultaneous = 0
663         l = limiter.ConcurrencyLimiter()
664         dl = []
665         for i in range(20):
666             dl.append(l.add(self.job, i, foo=str(i)))
667         d = defer.DeferredList(dl, fireOnOneErrback=True)
668         def _done(res):
669             self.failUnlessEqual(self.simultaneous, 0)
670             self.failUnless(self.peak_simultaneous <= 10)
671             self.failUnlessEqual(len(self.calls), 20)
672             for i in range(20):
673                 self.failUnless( (i, str(i)) in self.calls)
674         d.addCallback(_done)
675         return d
676
677     def test_errors(self):
678         self.calls = []
679         self.simultaneous = 0
680         self.peak_simultaneous = 0
681         l = limiter.ConcurrencyLimiter()
682         dl = []
683         for i in range(20):
684             dl.append(l.add(self.job, i, foo=str(i)))
685         d2 = l.add(self.bad_job, 21, "21")
686         d = defer.DeferredList(dl, fireOnOneErrback=True)
687         def _most_done(res):
688             results = []
689             for (success, result) in res:
690                 self.failUnlessEqual(success, True)
691                 results.append(result)
692             results.sort()
693             expected_results = ["done %d" % i for i in range(20)]
694             expected_results.sort()
695             self.failUnlessEqual(results, expected_results)
696             self.failUnless(self.peak_simultaneous <= 10)
697             self.failUnlessEqual(len(self.calls), 20)
698             for i in range(20):
699                 self.failUnless( (i, str(i)) in self.calls)
700             def _good(res):
701                 self.fail("should have failed, not got %s" % (res,))
702             def _err(f):
703                 f.trap(RuntimeError)
704                 self.failUnless("bad_job 21" in str(f))
705             d2.addCallbacks(_good, _err)
706             return d2
707         d.addCallback(_most_done)
708         def _all_done(res):
709             self.failUnlessEqual(self.simultaneous, 0)
710             self.failUnless(self.peak_simultaneous <= 10)
711             self.failUnlessEqual(len(self.calls), 20)
712             for i in range(20):
713                 self.failUnless( (i, str(i)) in self.calls)
714         d.addCallback(_all_done)
715         return d
716
717 class TimeFormat(unittest.TestCase):
718     def test_epoch(self):
719         s = time_format.iso_utc_time_to_localseconds("1970-01-01T00:00:01")
720         self.failUnlessEqual(s, 1.0)
721         s = time_format.iso_utc_time_to_localseconds("1970-01-01_00:00:01")
722         self.failUnlessEqual(s, 1.0)
723         s = time_format.iso_utc_time_to_localseconds("1970-01-01 00:00:01")
724         self.failUnlessEqual(s, 1.0)
725
726         self.failUnlessEqual(time_format.iso_utc(1.0), "1970-01-01_00:00:01")
727         self.failUnlessEqual(time_format.iso_utc(1.0, sep=" "),
728                              "1970-01-01 00:00:01")
729         now = time.time()
730         def my_time():
731             return 1.0
732         self.failUnlessEqual(time_format.iso_utc(t=my_time),
733                              "1970-01-01_00:00:01")
734         e = self.failUnlessRaises(ValueError,
735                                   time_format.iso_utc_time_to_localseconds,
736                                   "invalid timestring")
737         self.failUnless("not a complete ISO8601 timestamp" in str(e))
738         s = time_format.iso_utc_time_to_localseconds("1970-01-01_00:00:01.500")
739         self.failUnlessEqual(s, 1.5)
740
741 class CacheDir(unittest.TestCase):
742     def test_basic(self):
743         basedir = "test_util/CacheDir/test_basic"
744
745         def _failIfExists(name):
746             absfn = os.path.join(basedir, name)
747             self.failIf(os.path.exists(absfn),
748                         "%s exists but it shouldn't" % absfn)
749
750         def _failUnlessExists(name):
751             absfn = os.path.join(basedir, name)
752             self.failUnless(os.path.exists(absfn),
753                             "%s doesn't exist but it should" % absfn)
754
755         cdm = cachedir.CacheDirectoryManager(basedir)
756         a = cdm.get_file("a")
757         b = cdm.get_file("b")
758         c = cdm.get_file("c")
759         f = open(a.get_filename(), "wb"); f.write("hi"); f.close(); del f
760         f = open(b.get_filename(), "wb"); f.write("hi"); f.close(); del f
761         f = open(c.get_filename(), "wb"); f.write("hi"); f.close(); del f
762
763         _failUnlessExists("a")
764         _failUnlessExists("b")
765         _failUnlessExists("c")
766
767         cdm.check()
768
769         _failUnlessExists("a")
770         _failUnlessExists("b")
771         _failUnlessExists("c")
772
773         del a
774         # this file won't be deleted yet, because it isn't old enough
775         cdm.check()
776         _failUnlessExists("a")
777         _failUnlessExists("b")
778         _failUnlessExists("c")
779
780         # we change the definition of "old" to make everything old
781         cdm.old = -10
782
783         cdm.check()
784         _failIfExists("a")
785         _failUnlessExists("b")
786         _failUnlessExists("c")
787
788         cdm.old = 60*60
789
790         del b
791
792         cdm.check()
793         _failIfExists("a")
794         _failUnlessExists("b")
795         _failUnlessExists("c")
796
797         b2 = cdm.get_file("b")
798
799         cdm.check()
800         _failIfExists("a")
801         _failUnlessExists("b")
802         _failUnlessExists("c")