2 def foo(): pass # keep the line number constant
4 import os, time, random
5 from twisted.trial import unittest
6 from twisted.internet import defer, reactor
7 from twisted.python import failure
9 from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil
10 from allmydata.util import assertutil, fileutil, deferredutil, abbreviate
11 from allmydata.util import limiter, time_format, pollmixin, cachedir
12 from allmydata.util import statistics
14 class Base32(unittest.TestCase):
15 def test_b2a_matches_Pythons(self):
17 y = "\x12\x34\x45\x67\x89\x0a\xbc\xde\xf0"
18 x = base64.b32encode(y)
19 while x and x[-1] == '=':
22 self.failUnlessEqual(base32.b2a(y), x)
24 self.failUnlessEqual(base32.b2a("\x12\x34"), "ci2a")
25 def test_b2a_or_none(self):
26 self.failUnlessEqual(base32.b2a_or_none(None), None)
27 self.failUnlessEqual(base32.b2a_or_none("\x12\x34"), "ci2a")
29 self.failUnlessEqual(base32.a2b("ci2a"), "\x12\x34")
30 self.failUnlessRaises(AssertionError, base32.a2b, "b0gus")
32 class IDLib(unittest.TestCase):
33 def test_nodeid_b2a(self):
34 self.failUnlessEqual(idlib.nodeid_b2a("\x00"*20), "a"*32)
36 class NoArgumentException(Exception):
40 class HumanReadable(unittest.TestCase):
43 self.failUnlessEqual(hr(foo), "<foo() at test_util.py:2>")
44 self.failUnlessEqual(hr(self.test_repr),
45 "<bound method HumanReadable.test_repr of <allmydata.test.test_util.HumanReadable testMethod=test_repr>>")
46 self.failUnlessEqual(hr(1L), "1")
47 self.failUnlessEqual(hr(10**40),
48 "100000000000000000...000000000000000000")
49 self.failUnlessEqual(hr(self), "<allmydata.test.test_util.HumanReadable testMethod=test_repr>")
50 self.failUnlessEqual(hr([1,2]), "[1, 2]")
51 self.failUnlessEqual(hr({1:2}), "{1:2}")
56 hr(e) == "<RuntimeError: ()>" # python-2.4
57 or hr(e) == "RuntimeError()") # python-2.5
59 raise RuntimeError("oops")
62 hr(e) == "<RuntimeError: 'oops'>" # python-2.4
63 or hr(e) == "RuntimeError('oops',)") # python-2.5
65 raise NoArgumentException
68 hr(e) == "<NoArgumentException>" # python-2.4
69 or hr(e) == "NoArgumentException()") # python-2.5
75 class Math(unittest.TestCase):
76 def test_div_ceil(self):
78 self.failUnlessEqual(f(0, 1), 0)
79 self.failUnlessEqual(f(0, 2), 0)
80 self.failUnlessEqual(f(0, 3), 0)
81 self.failUnlessEqual(f(1, 3), 1)
82 self.failUnlessEqual(f(2, 3), 1)
83 self.failUnlessEqual(f(3, 3), 1)
84 self.failUnlessEqual(f(4, 3), 2)
85 self.failUnlessEqual(f(5, 3), 2)
86 self.failUnlessEqual(f(6, 3), 2)
87 self.failUnlessEqual(f(7, 3), 3)
89 def test_next_multiple(self):
90 f = mathutil.next_multiple
91 self.failUnlessEqual(f(5, 1), 5)
92 self.failUnlessEqual(f(5, 2), 6)
93 self.failUnlessEqual(f(5, 3), 6)
94 self.failUnlessEqual(f(5, 4), 8)
95 self.failUnlessEqual(f(5, 5), 5)
96 self.failUnlessEqual(f(5, 6), 6)
97 self.failUnlessEqual(f(32, 1), 32)
98 self.failUnlessEqual(f(32, 2), 32)
99 self.failUnlessEqual(f(32, 3), 33)
100 self.failUnlessEqual(f(32, 4), 32)
101 self.failUnlessEqual(f(32, 5), 35)
102 self.failUnlessEqual(f(32, 6), 36)
103 self.failUnlessEqual(f(32, 7), 35)
104 self.failUnlessEqual(f(32, 8), 32)
105 self.failUnlessEqual(f(32, 9), 36)
106 self.failUnlessEqual(f(32, 10), 40)
107 self.failUnlessEqual(f(32, 11), 33)
108 self.failUnlessEqual(f(32, 12), 36)
109 self.failUnlessEqual(f(32, 13), 39)
110 self.failUnlessEqual(f(32, 14), 42)
111 self.failUnlessEqual(f(32, 15), 45)
112 self.failUnlessEqual(f(32, 16), 32)
113 self.failUnlessEqual(f(32, 17), 34)
114 self.failUnlessEqual(f(32, 18), 36)
115 self.failUnlessEqual(f(32, 589), 589)
117 def test_pad_size(self):
118 f = mathutil.pad_size
119 self.failUnlessEqual(f(0, 4), 0)
120 self.failUnlessEqual(f(1, 4), 3)
121 self.failUnlessEqual(f(2, 4), 2)
122 self.failUnlessEqual(f(3, 4), 1)
123 self.failUnlessEqual(f(4, 4), 0)
124 self.failUnlessEqual(f(5, 4), 3)
126 def test_is_power_of_k(self):
127 f = mathutil.is_power_of_k
128 for i in range(1, 100):
129 if i in (1, 2, 4, 8, 16, 32, 64):
130 self.failUnless(f(i, 2), "but %d *is* a power of 2" % i)
132 self.failIf(f(i, 2), "but %d is *not* a power of 2" % i)
133 for i in range(1, 100):
134 if i in (1, 3, 9, 27, 81):
135 self.failUnless(f(i, 3), "but %d *is* a power of 3" % i)
137 self.failIf(f(i, 3), "but %d is *not* a power of 3" % i)
139 def test_next_power_of_k(self):
140 f = mathutil.next_power_of_k
141 self.failUnlessEqual(f(0,2), 1)
142 self.failUnlessEqual(f(1,2), 1)
143 self.failUnlessEqual(f(2,2), 2)
144 self.failUnlessEqual(f(3,2), 4)
145 self.failUnlessEqual(f(4,2), 4)
146 for i in range(5, 8): self.failUnlessEqual(f(i,2), 8, "%d" % i)
147 for i in range(9, 16): self.failUnlessEqual(f(i,2), 16, "%d" % i)
148 for i in range(17, 32): self.failUnlessEqual(f(i,2), 32, "%d" % i)
149 for i in range(33, 64): self.failUnlessEqual(f(i,2), 64, "%d" % i)
150 for i in range(65, 100): self.failUnlessEqual(f(i,2), 128, "%d" % i)
152 self.failUnlessEqual(f(0,3), 1)
153 self.failUnlessEqual(f(1,3), 1)
154 self.failUnlessEqual(f(2,3), 3)
155 self.failUnlessEqual(f(3,3), 3)
156 for i in range(4, 9): self.failUnlessEqual(f(i,3), 9, "%d" % i)
157 for i in range(10, 27): self.failUnlessEqual(f(i,3), 27, "%d" % i)
158 for i in range(28, 81): self.failUnlessEqual(f(i,3), 81, "%d" % i)
159 for i in range(82, 200): self.failUnlessEqual(f(i,3), 243, "%d" % i)
163 self.failUnlessEqual(f([1,2,3]), 2)
164 self.failUnlessEqual(f([0,0,0,4]), 1)
165 self.failUnlessAlmostEqual(f([0.0, 1.0, 1.0]), .666666666666)
167 def test_round_sigfigs(self):
168 f = mathutil.round_sigfigs
169 self.failUnlessEqual(f(22.0/3, 4), 7.3330000000000002)
171 class Statistics(unittest.TestCase):
172 def should_assert(self, msg, func, *args, **kwargs):
174 func(*args, **kwargs)
176 except AssertionError, e:
179 def failUnlessListEqual(self, a, b, msg = None):
180 self.failUnlessEqual(len(a), len(b))
181 for i in range(len(a)):
182 self.failUnlessEqual(a[i], b[i], msg)
184 def failUnlessListAlmostEqual(self, a, b, places = 7, msg = None):
185 self.failUnlessEqual(len(a), len(b))
186 for i in range(len(a)):
187 self.failUnlessAlmostEqual(a[i], b[i], places, msg)
189 def test_binomial_coeff(self):
190 f = statistics.binomial_coeff
191 self.failUnlessEqual(f(20, 0), 1)
192 self.failUnlessEqual(f(20, 1), 20)
193 self.failUnlessEqual(f(20, 2), 190)
194 self.failUnlessEqual(f(20, 8), f(20, 12))
195 self.should_assert("Should assert if n < k", f, 2, 3)
197 def test_binomial_distribution_pmf(self):
198 f = statistics.binomial_distribution_pmf
201 pmf_stat = [0.81, 0.18, 0.01]
202 self.failUnlessListAlmostEqual(pmf_comp, pmf_stat)
204 # Summing across a PMF should give the total probability 1
205 self.failUnlessAlmostEqual(sum(pmf_comp), 1)
206 self.should_assert("Should assert if not 0<=p<=1", f, 1, -1)
207 self.should_assert("Should assert if n < 1", f, 0, .1)
209 def test_survival_pmf(self):
210 f = statistics.survival_pmf
211 # Cross-check binomial-distribution method against convolution
213 p_list = [.9999] * 100 + [.99] * 50 + [.8] * 20
214 pmf1 = statistics.survival_pmf_via_conv(p_list)
215 pmf2 = statistics.survival_pmf_via_bd(p_list)
216 self.failUnlessListAlmostEqual(pmf1, pmf2)
217 self.failUnlessTrue(statistics.valid_pmf(pmf1))
218 self.should_assert("Should assert if p_i > 1", f, [1.1]);
219 self.should_assert("Should assert if p_i < 0", f, [-.1]);
222 def test_convolve(self):
223 f = statistics.convolve
227 v1v2result = [ 4, 13, 28, 27, 18 ]
228 # Convolution is commutative
231 self.failUnlessListEqual(r1, r2, "Convolution should be commutative")
232 self.failUnlessListEqual(r1, v1v2result, "Didn't match known result")
233 # Convolution is associative
234 r1 = f(f(v1, v2), v3)
235 r2 = f(v1, f(v2, v3))
236 self.failUnlessListEqual(r1, r2, "Convolution should be associative")
237 # Convolution is distributive
238 r1 = f(v3, [ a + b for a, b in zip(v1, v2) ])
241 r2 = [ a + b for a, b in zip(tmp1, tmp2) ]
242 self.failUnlessListEqual(r1, r2, "Convolution should be distributive")
243 # Convolution is scalar multiplication associative
245 r1 = [ a * 4 for a in tmp1 ]
246 tmp2 = [ a * 4 for a in v1 ]
248 self.failUnlessListEqual(r1, r2, "Convolution should be scalar multiplication associative")
250 def test_find_k(self):
251 f = statistics.find_k
252 g = statistics.pr_file_loss
253 plist = [.9] * 10 + [.8] * 10
256 self.failUnlessEqual(k, 10)
257 self.failUnless(g(plist, k) < t)
259 def test_pr_file_loss(self):
260 f = statistics.pr_file_loss
262 self.failUnlessEqual(f(plist, 3), .0546875)
264 def test_pr_backup_file_loss(self):
265 f = statistics.pr_backup_file_loss
267 self.failUnlessEqual(f(plist, .5, 3), .02734375)
270 class Asserts(unittest.TestCase):
271 def should_assert(self, func, *args, **kwargs):
273 func(*args, **kwargs)
274 except AssertionError, e:
277 self.fail("assert failed with non-AssertionError: %s" % e)
278 self.fail("assert was not caught")
280 def should_not_assert(self, func, *args, **kwargs):
282 regexp = kwargs["re"]
285 func(*args, **kwargs)
286 except AssertionError, e:
287 self.fail("assertion fired when it should not have: %s" % e)
289 self.fail("assertion (which shouldn't have failed) failed with non-AssertionError: %s" % e)
293 def test_assert(self):
294 f = assertutil._assert
295 self.should_assert(f)
296 self.should_assert(f, False)
297 self.should_not_assert(f, True)
299 m = self.should_assert(f, False, "message")
300 self.failUnlessEqual(m, "'message' <type 'str'>", m)
301 m = self.should_assert(f, False, "message1", othermsg=12)
302 self.failUnlessEqual("'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
303 m = self.should_assert(f, False, othermsg="message2")
304 self.failUnlessEqual("othermsg: 'message2' <type 'str'>", m)
306 def test_precondition(self):
307 f = assertutil.precondition
308 self.should_assert(f)
309 self.should_assert(f, False)
310 self.should_not_assert(f, True)
312 m = self.should_assert(f, False, "message")
313 self.failUnlessEqual("precondition: 'message' <type 'str'>", m)
314 m = self.should_assert(f, False, "message1", othermsg=12)
315 self.failUnlessEqual("precondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
316 m = self.should_assert(f, False, othermsg="message2")
317 self.failUnlessEqual("precondition: othermsg: 'message2' <type 'str'>", m)
319 def test_postcondition(self):
320 f = assertutil.postcondition
321 self.should_assert(f)
322 self.should_assert(f, False)
323 self.should_not_assert(f, True)
325 m = self.should_assert(f, False, "message")
326 self.failUnlessEqual("postcondition: 'message' <type 'str'>", m)
327 m = self.should_assert(f, False, "message1", othermsg=12)
328 self.failUnlessEqual("postcondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
329 m = self.should_assert(f, False, othermsg="message2")
330 self.failUnlessEqual("postcondition: othermsg: 'message2' <type 'str'>", m)
332 class FileUtil(unittest.TestCase):
333 def mkdir(self, basedir, path, mode=0777):
334 fn = os.path.join(basedir, path)
335 fileutil.make_dirs(fn, mode)
337 def touch(self, basedir, path, mode=None, data="touch\n"):
338 fn = os.path.join(basedir, path)
345 def test_rm_dir(self):
346 basedir = "util/FileUtil/test_rm_dir"
347 fileutil.make_dirs(basedir)
348 # create it again to test idempotency
349 fileutil.make_dirs(basedir)
350 d = os.path.join(basedir, "doomed")
352 self.touch(d, "a/b/1.txt")
353 self.touch(d, "a/b/2.txt", 0444)
354 self.touch(d, "a/b/3.txt", 0)
356 self.touch(d, "a/c/1.txt")
357 self.touch(d, "a/c/2.txt", 0444)
358 self.touch(d, "a/c/3.txt", 0)
359 os.chmod(os.path.join(d, "a/c"), 0444)
361 self.touch(d, "a/d/1.txt")
362 self.touch(d, "a/d/2.txt", 0444)
363 self.touch(d, "a/d/3.txt", 0)
364 os.chmod(os.path.join(d, "a/d"), 0)
367 self.failIf(os.path.exists(d))
368 # remove it again to test idempotency
371 def test_remove_if_possible(self):
372 basedir = "util/FileUtil/test_remove_if_possible"
373 fileutil.make_dirs(basedir)
374 self.touch(basedir, "here")
375 fn = os.path.join(basedir, "here")
376 fileutil.remove_if_possible(fn)
377 self.failIf(os.path.exists(fn))
378 fileutil.remove_if_possible(fn) # should be idempotent
379 fileutil.rm_dir(basedir)
380 fileutil.remove_if_possible(fn) # should survive errors
382 def test_open_or_create(self):
383 basedir = "util/FileUtil/test_open_or_create"
384 fileutil.make_dirs(basedir)
385 fn = os.path.join(basedir, "here")
386 f = fileutil.open_or_create(fn)
389 f = fileutil.open_or_create(fn)
396 self.failUnlessEqual(data, "stuff.more.")
398 def test_NamedTemporaryDirectory(self):
399 basedir = "util/FileUtil/test_NamedTemporaryDirectory"
400 fileutil.make_dirs(basedir)
401 td = fileutil.NamedTemporaryDirectory(dir=basedir)
403 self.failUnless(basedir in name)
404 self.failUnless(basedir in repr(td))
405 self.failUnless(os.path.isdir(name))
407 # it is conceivable that we need to force gc here, but I'm not sure
408 self.failIf(os.path.isdir(name))
410 def test_rename(self):
411 basedir = "util/FileUtil/test_rename"
412 fileutil.make_dirs(basedir)
413 self.touch(basedir, "here")
414 fn = os.path.join(basedir, "here")
415 fn2 = os.path.join(basedir, "there")
416 fileutil.rename(fn, fn2)
417 self.failIf(os.path.exists(fn))
418 self.failUnless(os.path.exists(fn2))
421 basedir = "util/FileUtil/test_du"
422 fileutil.make_dirs(basedir)
423 d = os.path.join(basedir, "space-consuming")
425 self.touch(d, "a/b/1.txt", data="a"*10)
426 self.touch(d, "a/b/2.txt", data="b"*11)
428 self.touch(d, "a/c/1.txt", data="c"*12)
429 self.touch(d, "a/c/2.txt", data="d"*13)
431 used = fileutil.du(basedir)
432 self.failUnlessEqual(10+11+12+13, used)
434 class PollMixinTests(unittest.TestCase):
436 self.pm = pollmixin.PollMixin()
438 def test_PollMixin_True(self):
439 d = self.pm.poll(check_f=lambda : True,
443 def test_PollMixin_False_then_True(self):
444 i = iter([False, True])
445 d = self.pm.poll(check_f=i.next,
449 def test_timeout(self):
450 d = self.pm.poll(check_f=lambda: False,
454 self.fail("poll should have failed, not returned %s" % (res,))
456 f.trap(pollmixin.TimeoutError)
457 return None # success
458 d.addCallbacks(_suc, _err)
461 class DeferredUtilTests(unittest.TestCase):
462 def test_gather_results(self):
463 d1 = defer.Deferred()
464 d2 = defer.Deferred()
465 res = deferredutil.gatherResults([d1, d2])
466 d1.errback(ValueError("BAD"))
468 self.fail("Should have errbacked, not resulted in %s" % (res,))
470 thef.trap(ValueError)
471 res.addCallbacks(_callb, _errb)
474 def test_success(self):
475 d1, d2 = defer.Deferred(), defer.Deferred()
478 dlss = deferredutil.DeferredListShouldSucceed([d1,d2])
479 dlss.addCallbacks(good.append, bad.append)
482 self.failUnlessEqual(good, [[1,2]])
483 self.failUnlessEqual(bad, [])
485 def test_failure(self):
486 d1, d2 = defer.Deferred(), defer.Deferred()
489 dlss = deferredutil.DeferredListShouldSucceed([d1,d2])
490 dlss.addCallbacks(good.append, bad.append)
491 d1.addErrback(lambda _ignore: None)
492 d2.addErrback(lambda _ignore: None)
494 d2.errback(RuntimeError())
495 self.failUnlessEqual(good, [])
496 self.failUnlessEqual(len(bad), 1)
498 self.failUnless(isinstance(f, failure.Failure))
499 self.failUnless(f.check(RuntimeError))
501 class HashUtilTests(unittest.TestCase):
503 def test_random_key(self):
504 k = hashutil.random_key()
505 self.failUnlessEqual(len(k), hashutil.KEYLEN)
507 def test_sha256d(self):
508 h1 = hashutil.tagged_hash("tag1", "value")
509 h2 = hashutil.tagged_hasher("tag1")
513 self.failUnlessEqual(h1, h2a)
514 self.failUnlessEqual(h2a, h2b)
516 def test_sha256d_truncated(self):
517 h1 = hashutil.tagged_hash("tag1", "value", 16)
518 h2 = hashutil.tagged_hasher("tag1", 16)
521 self.failUnlessEqual(len(h1), 16)
522 self.failUnlessEqual(len(h2), 16)
523 self.failUnlessEqual(h1, h2)
526 h1 = hashutil.convergence_hash(3, 10, 1000, "data", "secret")
527 h2 = hashutil.convergence_hasher(3, 10, 1000, "secret")
530 self.failUnlessEqual(h1, h2)
532 def test_hashers(self):
533 h1 = hashutil.block_hash("foo")
534 h2 = hashutil.block_hasher()
536 self.failUnlessEqual(h1, h2.digest())
538 h1 = hashutil.uri_extension_hash("foo")
539 h2 = hashutil.uri_extension_hasher()
541 self.failUnlessEqual(h1, h2.digest())
543 h1 = hashutil.plaintext_hash("foo")
544 h2 = hashutil.plaintext_hasher()
546 self.failUnlessEqual(h1, h2.digest())
548 h1 = hashutil.crypttext_hash("foo")
549 h2 = hashutil.crypttext_hasher()
551 self.failUnlessEqual(h1, h2.digest())
553 h1 = hashutil.crypttext_segment_hash("foo")
554 h2 = hashutil.crypttext_segment_hasher()
556 self.failUnlessEqual(h1, h2.digest())
558 h1 = hashutil.plaintext_segment_hash("foo")
559 h2 = hashutil.plaintext_segment_hasher()
561 self.failUnlessEqual(h1, h2.digest())
563 class Abbreviate(unittest.TestCase):
565 a = abbreviate.abbreviate_time
566 self.failUnlessEqual(a(None), "unknown")
567 self.failUnlessEqual(a(0), "0 seconds")
568 self.failUnlessEqual(a(1), "1 second")
569 self.failUnlessEqual(a(2), "2 seconds")
570 self.failUnlessEqual(a(119), "119 seconds")
572 self.failUnlessEqual(a(2*MIN), "2 minutes")
573 self.failUnlessEqual(a(60*MIN), "60 minutes")
574 self.failUnlessEqual(a(179*MIN), "179 minutes")
576 self.failUnlessEqual(a(180*MIN), "3 hours")
577 self.failUnlessEqual(a(4*HOUR), "4 hours")
580 self.failUnlessEqual(a(2*DAY), "2 days")
581 self.failUnlessEqual(a(2*MONTH), "2 months")
583 self.failUnlessEqual(a(5*YEAR), "5 years")
585 def test_space(self):
586 tests_si = [(None, "unknown"),
593 (20*1000, "20.00 kB"),
594 (1024*1024, "1.05 MB"),
595 (1000*1000, "1.00 MB"),
596 (1000*1000*1000, "1.00 GB"),
597 (1000*1000*1000*1000, "1.00 TB"),
598 (1000*1000*1000*1000*1000, "1.00 PB"),
599 (1234567890123456, "1.23 PB"),
601 for (x, expected) in tests_si:
602 got = abbreviate.abbreviate_space(x, SI=True)
603 self.failUnlessEqual(got, expected)
605 tests_base1024 = [(None, "unknown"),
612 (20*1024, "20.00 kiB"),
613 (1000*1000, "976.56 kiB"),
614 (1024*1024, "1.00 MiB"),
615 (1024*1024*1024, "1.00 GiB"),
616 (1024*1024*1024*1024, "1.00 TiB"),
617 (1000*1000*1000*1000*1000, "909.49 TiB"),
618 (1024*1024*1024*1024*1024, "1.00 PiB"),
619 (1234567890123456, "1.10 PiB"),
621 for (x, expected) in tests_base1024:
622 got = abbreviate.abbreviate_space(x, SI=False)
623 self.failUnlessEqual(got, expected)
625 self.failUnlessEqual(abbreviate.abbreviate_space_both(1234567),
626 "(1.23 MB, 1.18 MiB)")
628 def test_parse_space(self):
629 p = abbreviate.parse_abbreviated_size
630 self.failUnlessEqual(p(""), None)
631 self.failUnlessEqual(p(None), None)
632 self.failUnlessEqual(p("123"), 123)
633 self.failUnlessEqual(p("123B"), 123)
634 self.failUnlessEqual(p("2K"), 2000)
635 self.failUnlessEqual(p("2kb"), 2000)
636 self.failUnlessEqual(p("2KiB"), 2048)
637 self.failUnlessEqual(p("10MB"), 10*1000*1000)
638 self.failUnlessEqual(p("10MiB"), 10*1024*1024)
639 self.failUnlessEqual(p("5G"), 5*1000*1000*1000)
640 self.failUnlessEqual(p("4GiB"), 4*1024*1024*1024)
641 e = self.failUnlessRaises(ValueError, p, "12 cubits")
642 self.failUnless("12 cubits" in str(e))
644 class Limiter(unittest.TestCase):
645 def job(self, i, foo):
646 self.calls.append( (i, foo) )
647 self.simultaneous += 1
648 self.peak_simultaneous = max(self.simultaneous, self.peak_simultaneous)
651 self.simultaneous -= 1
652 d.callback("done %d" % i)
653 reactor.callLater(1.0, _done)
656 def bad_job(self, i, foo):
657 raise RuntimeError("bad_job %d" % i)
659 def test_limiter(self):
661 self.simultaneous = 0
662 self.peak_simultaneous = 0
663 l = limiter.ConcurrencyLimiter()
666 dl.append(l.add(self.job, i, foo=str(i)))
667 d = defer.DeferredList(dl, fireOnOneErrback=True)
669 self.failUnlessEqual(self.simultaneous, 0)
670 self.failUnless(self.peak_simultaneous <= 10)
671 self.failUnlessEqual(len(self.calls), 20)
673 self.failUnless( (i, str(i)) in self.calls)
677 def test_errors(self):
679 self.simultaneous = 0
680 self.peak_simultaneous = 0
681 l = limiter.ConcurrencyLimiter()
684 dl.append(l.add(self.job, i, foo=str(i)))
685 d2 = l.add(self.bad_job, 21, "21")
686 d = defer.DeferredList(dl, fireOnOneErrback=True)
689 for (success, result) in res:
690 self.failUnlessEqual(success, True)
691 results.append(result)
693 expected_results = ["done %d" % i for i in range(20)]
694 expected_results.sort()
695 self.failUnlessEqual(results, expected_results)
696 self.failUnless(self.peak_simultaneous <= 10)
697 self.failUnlessEqual(len(self.calls), 20)
699 self.failUnless( (i, str(i)) in self.calls)
701 self.fail("should have failed, not got %s" % (res,))
704 self.failUnless("bad_job 21" in str(f))
705 d2.addCallbacks(_good, _err)
707 d.addCallback(_most_done)
709 self.failUnlessEqual(self.simultaneous, 0)
710 self.failUnless(self.peak_simultaneous <= 10)
711 self.failUnlessEqual(len(self.calls), 20)
713 self.failUnless( (i, str(i)) in self.calls)
714 d.addCallback(_all_done)
717 class TimeFormat(unittest.TestCase):
718 def test_epoch(self):
719 s = time_format.iso_utc_time_to_localseconds("1970-01-01T00:00:01")
720 self.failUnlessEqual(s, 1.0)
721 s = time_format.iso_utc_time_to_localseconds("1970-01-01_00:00:01")
722 self.failUnlessEqual(s, 1.0)
723 s = time_format.iso_utc_time_to_localseconds("1970-01-01 00:00:01")
724 self.failUnlessEqual(s, 1.0)
726 self.failUnlessEqual(time_format.iso_utc(1.0), "1970-01-01_00:00:01")
727 self.failUnlessEqual(time_format.iso_utc(1.0, sep=" "),
728 "1970-01-01 00:00:01")
732 self.failUnlessEqual(time_format.iso_utc(t=my_time),
733 "1970-01-01_00:00:01")
734 e = self.failUnlessRaises(ValueError,
735 time_format.iso_utc_time_to_localseconds,
736 "invalid timestring")
737 self.failUnless("not a complete ISO8601 timestamp" in str(e))
738 s = time_format.iso_utc_time_to_localseconds("1970-01-01_00:00:01.500")
739 self.failUnlessEqual(s, 1.5)
741 class CacheDir(unittest.TestCase):
742 def test_basic(self):
743 basedir = "test_util/CacheDir/test_basic"
745 def _failIfExists(name):
746 absfn = os.path.join(basedir, name)
747 self.failIf(os.path.exists(absfn),
748 "%s exists but it shouldn't" % absfn)
750 def _failUnlessExists(name):
751 absfn = os.path.join(basedir, name)
752 self.failUnless(os.path.exists(absfn),
753 "%s doesn't exist but it should" % absfn)
755 cdm = cachedir.CacheDirectoryManager(basedir)
756 a = cdm.get_file("a")
757 b = cdm.get_file("b")
758 c = cdm.get_file("c")
759 f = open(a.get_filename(), "wb"); f.write("hi"); f.close(); del f
760 f = open(b.get_filename(), "wb"); f.write("hi"); f.close(); del f
761 f = open(c.get_filename(), "wb"); f.write("hi"); f.close(); del f
763 _failUnlessExists("a")
764 _failUnlessExists("b")
765 _failUnlessExists("c")
769 _failUnlessExists("a")
770 _failUnlessExists("b")
771 _failUnlessExists("c")
774 # this file won't be deleted yet, because it isn't old enough
776 _failUnlessExists("a")
777 _failUnlessExists("b")
778 _failUnlessExists("c")
780 # we change the definition of "old" to make everything old
785 _failUnlessExists("b")
786 _failUnlessExists("c")
794 _failUnlessExists("b")
795 _failUnlessExists("c")
797 b2 = cdm.get_file("b")
801 _failUnlessExists("b")
802 _failUnlessExists("c")