]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_util.py
test_util: get almost full test coverage of dictutil, starting with the original...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_util.py
1
2 def foo(): pass # keep the line number constant
3
4 import os, time
5 from StringIO import StringIO
6 from twisted.trial import unittest
7 from twisted.internet import defer, reactor
8 from twisted.python import failure
9
10 from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil
11 from allmydata.util import assertutil, fileutil, deferredutil, abbreviate
12 from allmydata.util import limiter, time_format, pollmixin, cachedir
13 from allmydata.util import statistics, dictutil
14
15 class Base32(unittest.TestCase):
16     def test_b2a_matches_Pythons(self):
17         import base64
18         y = "\x12\x34\x45\x67\x89\x0a\xbc\xde\xf0"
19         x = base64.b32encode(y)
20         while x and x[-1] == '=':
21             x = x[:-1]
22         x = x.lower()
23         self.failUnlessEqual(base32.b2a(y), x)
24     def test_b2a(self):
25         self.failUnlessEqual(base32.b2a("\x12\x34"), "ci2a")
26     def test_b2a_or_none(self):
27         self.failUnlessEqual(base32.b2a_or_none(None), None)
28         self.failUnlessEqual(base32.b2a_or_none("\x12\x34"), "ci2a")
29     def test_a2b(self):
30         self.failUnlessEqual(base32.a2b("ci2a"), "\x12\x34")
31         self.failUnlessRaises(AssertionError, base32.a2b, "b0gus")
32
33 class IDLib(unittest.TestCase):
34     def test_nodeid_b2a(self):
35         self.failUnlessEqual(idlib.nodeid_b2a("\x00"*20), "a"*32)
36
37 class NoArgumentException(Exception):
38     def __init__(self):
39         pass
40
41 class HumanReadable(unittest.TestCase):
42     def test_repr(self):
43         hr = humanreadable.hr
44         self.failUnlessEqual(hr(foo), "<foo() at test_util.py:2>")
45         self.failUnlessEqual(hr(self.test_repr),
46                              "<bound method HumanReadable.test_repr of <allmydata.test.test_util.HumanReadable testMethod=test_repr>>")
47         self.failUnlessEqual(hr(1L), "1")
48         self.failUnlessEqual(hr(10**40),
49                              "100000000000000000...000000000000000000")
50         self.failUnlessEqual(hr(self), "<allmydata.test.test_util.HumanReadable testMethod=test_repr>")
51         self.failUnlessEqual(hr([1,2]), "[1, 2]")
52         self.failUnlessEqual(hr({1:2}), "{1:2}")
53         try:
54             raise RuntimeError
55         except Exception, e:
56             self.failUnless(
57                 hr(e) == "<RuntimeError: ()>" # python-2.4
58                 or hr(e) == "RuntimeError()") # python-2.5
59         try:
60             raise RuntimeError("oops")
61         except Exception, e:
62             self.failUnless(
63                 hr(e) == "<RuntimeError: 'oops'>" # python-2.4
64                 or hr(e) == "RuntimeError('oops',)") # python-2.5
65         try:
66             raise NoArgumentException
67         except Exception, e:
68             self.failUnless(
69                 hr(e) == "<NoArgumentException>" # python-2.4
70                 or hr(e) == "NoArgumentException()") # python-2.5
71
72
73 class MyList(list):
74     pass
75
76 class Math(unittest.TestCase):
77     def test_div_ceil(self):
78         f = mathutil.div_ceil
79         self.failUnlessEqual(f(0, 1), 0)
80         self.failUnlessEqual(f(0, 2), 0)
81         self.failUnlessEqual(f(0, 3), 0)
82         self.failUnlessEqual(f(1, 3), 1)
83         self.failUnlessEqual(f(2, 3), 1)
84         self.failUnlessEqual(f(3, 3), 1)
85         self.failUnlessEqual(f(4, 3), 2)
86         self.failUnlessEqual(f(5, 3), 2)
87         self.failUnlessEqual(f(6, 3), 2)
88         self.failUnlessEqual(f(7, 3), 3)
89
90     def test_next_multiple(self):
91         f = mathutil.next_multiple
92         self.failUnlessEqual(f(5, 1), 5)
93         self.failUnlessEqual(f(5, 2), 6)
94         self.failUnlessEqual(f(5, 3), 6)
95         self.failUnlessEqual(f(5, 4), 8)
96         self.failUnlessEqual(f(5, 5), 5)
97         self.failUnlessEqual(f(5, 6), 6)
98         self.failUnlessEqual(f(32, 1), 32)
99         self.failUnlessEqual(f(32, 2), 32)
100         self.failUnlessEqual(f(32, 3), 33)
101         self.failUnlessEqual(f(32, 4), 32)
102         self.failUnlessEqual(f(32, 5), 35)
103         self.failUnlessEqual(f(32, 6), 36)
104         self.failUnlessEqual(f(32, 7), 35)
105         self.failUnlessEqual(f(32, 8), 32)
106         self.failUnlessEqual(f(32, 9), 36)
107         self.failUnlessEqual(f(32, 10), 40)
108         self.failUnlessEqual(f(32, 11), 33)
109         self.failUnlessEqual(f(32, 12), 36)
110         self.failUnlessEqual(f(32, 13), 39)
111         self.failUnlessEqual(f(32, 14), 42)
112         self.failUnlessEqual(f(32, 15), 45)
113         self.failUnlessEqual(f(32, 16), 32)
114         self.failUnlessEqual(f(32, 17), 34)
115         self.failUnlessEqual(f(32, 18), 36)
116         self.failUnlessEqual(f(32, 589), 589)
117
118     def test_pad_size(self):
119         f = mathutil.pad_size
120         self.failUnlessEqual(f(0, 4), 0)
121         self.failUnlessEqual(f(1, 4), 3)
122         self.failUnlessEqual(f(2, 4), 2)
123         self.failUnlessEqual(f(3, 4), 1)
124         self.failUnlessEqual(f(4, 4), 0)
125         self.failUnlessEqual(f(5, 4), 3)
126
127     def test_is_power_of_k(self):
128         f = mathutil.is_power_of_k
129         for i in range(1, 100):
130             if i in (1, 2, 4, 8, 16, 32, 64):
131                 self.failUnless(f(i, 2), "but %d *is* a power of 2" % i)
132             else:
133                 self.failIf(f(i, 2), "but %d is *not* a power of 2" % i)
134         for i in range(1, 100):
135             if i in (1, 3, 9, 27, 81):
136                 self.failUnless(f(i, 3), "but %d *is* a power of 3" % i)
137             else:
138                 self.failIf(f(i, 3), "but %d is *not* a power of 3" % i)
139
140     def test_next_power_of_k(self):
141         f = mathutil.next_power_of_k
142         self.failUnlessEqual(f(0,2), 1)
143         self.failUnlessEqual(f(1,2), 1)
144         self.failUnlessEqual(f(2,2), 2)
145         self.failUnlessEqual(f(3,2), 4)
146         self.failUnlessEqual(f(4,2), 4)
147         for i in range(5, 8): self.failUnlessEqual(f(i,2), 8, "%d" % i)
148         for i in range(9, 16): self.failUnlessEqual(f(i,2), 16, "%d" % i)
149         for i in range(17, 32): self.failUnlessEqual(f(i,2), 32, "%d" % i)
150         for i in range(33, 64): self.failUnlessEqual(f(i,2), 64, "%d" % i)
151         for i in range(65, 100): self.failUnlessEqual(f(i,2), 128, "%d" % i)
152
153         self.failUnlessEqual(f(0,3), 1)
154         self.failUnlessEqual(f(1,3), 1)
155         self.failUnlessEqual(f(2,3), 3)
156         self.failUnlessEqual(f(3,3), 3)
157         for i in range(4, 9): self.failUnlessEqual(f(i,3), 9, "%d" % i)
158         for i in range(10, 27): self.failUnlessEqual(f(i,3), 27, "%d" % i)
159         for i in range(28, 81): self.failUnlessEqual(f(i,3), 81, "%d" % i)
160         for i in range(82, 200): self.failUnlessEqual(f(i,3), 243, "%d" % i)
161
162     def test_ave(self):
163         f = mathutil.ave
164         self.failUnlessEqual(f([1,2,3]), 2)
165         self.failUnlessEqual(f([0,0,0,4]), 1)
166         self.failUnlessAlmostEqual(f([0.0, 1.0, 1.0]), .666666666666)
167
168     def test_round_sigfigs(self):
169         f = mathutil.round_sigfigs
170         self.failUnlessEqual(f(22.0/3, 4), 7.3330000000000002)
171
172 class Statistics(unittest.TestCase):
173     def should_assert(self, msg, func, *args, **kwargs):
174         try:
175             func(*args, **kwargs)
176             self.fail(msg)
177         except AssertionError, e:
178             pass
179
180     def failUnlessListEqual(self, a, b, msg = None):
181         self.failUnlessEqual(len(a), len(b))
182         for i in range(len(a)):
183             self.failUnlessEqual(a[i], b[i], msg)
184
185     def failUnlessListAlmostEqual(self, a, b, places = 7, msg = None):
186         self.failUnlessEqual(len(a), len(b))
187         for i in range(len(a)):
188             self.failUnlessAlmostEqual(a[i], b[i], places, msg)
189
190     def test_binomial_coeff(self):
191         f = statistics.binomial_coeff
192         self.failUnlessEqual(f(20, 0), 1)
193         self.failUnlessEqual(f(20, 1), 20)
194         self.failUnlessEqual(f(20, 2), 190)
195         self.failUnlessEqual(f(20, 8), f(20, 12))
196         self.should_assert("Should assert if n < k", f, 2, 3)
197
198     def test_binomial_distribution_pmf(self):
199         f = statistics.binomial_distribution_pmf
200
201         pmf_comp = f(2, .1)
202         pmf_stat = [0.81, 0.18, 0.01]
203         self.failUnlessListAlmostEqual(pmf_comp, pmf_stat)
204         
205         # Summing across a PMF should give the total probability 1
206         self.failUnlessAlmostEqual(sum(pmf_comp), 1)
207         self.should_assert("Should assert if not 0<=p<=1", f, 1, -1)
208         self.should_assert("Should assert if n < 1", f, 0, .1)
209
210         out = StringIO()
211         statistics.print_pmf(pmf_comp, out=out)
212         lines = out.getvalue().splitlines()
213         self.failUnlessEqual(lines[0], "i=0: 0.81")
214         self.failUnlessEqual(lines[1], "i=1: 0.18")
215         self.failUnlessEqual(lines[2], "i=2: 0.01")
216
217     def test_survival_pmf(self):
218         f = statistics.survival_pmf
219         # Cross-check binomial-distribution method against convolution
220         # method.
221         p_list = [.9999] * 100 + [.99] * 50 + [.8] * 20
222         pmf1 = statistics.survival_pmf_via_conv(p_list)
223         pmf2 = statistics.survival_pmf_via_bd(p_list)
224         self.failUnlessListAlmostEqual(pmf1, pmf2)
225         self.failUnlessTrue(statistics.valid_pmf(pmf1))
226         self.should_assert("Should assert if p_i > 1", f, [1.1]);
227         self.should_assert("Should assert if p_i < 0", f, [-.1]);
228
229     def test_repair_count_pmf(self):
230         survival_pmf = statistics.binomial_distribution_pmf(5, .9)
231         repair_pmf = statistics.repair_count_pmf(survival_pmf, 3)
232         # repair_pmf[0] == sum(survival_pmf[0,1,2,5])
233         # repair_pmf[1] == survival_pmf[4]
234         # repair_pmf[2] = survival_pmf[3]
235         self.failUnlessListAlmostEqual(repair_pmf,
236                                        [0.00001 + 0.00045 + 0.0081 + 0.59049,
237                                         .32805,
238                                         .0729,
239                                         0, 0, 0])
240
241     def test_repair_cost(self):
242         survival_pmf = statistics.binomial_distribution_pmf(5, .9)
243         bwcost = statistics.bandwidth_cost_function
244         cost = statistics.mean_repair_cost(bwcost, 1000,
245                                            survival_pmf, 3, ul_dl_ratio=1.0)
246         self.failUnlessAlmostEqual(cost, 558.90)
247         cost = statistics.mean_repair_cost(bwcost, 1000,
248                                            survival_pmf, 3, ul_dl_ratio=8.0)
249         self.failUnlessAlmostEqual(cost, 1664.55)
250
251         # I haven't manually checked the math beyond here -warner
252         cost = statistics.eternal_repair_cost(bwcost, 1000,
253                                               survival_pmf, 3,
254                                               discount_rate=0, ul_dl_ratio=1.0)
255         self.failUnlessAlmostEqual(cost, 65292.056074766246)
256         cost = statistics.eternal_repair_cost(bwcost, 1000,
257                                               survival_pmf, 3,
258                                               discount_rate=0.05,
259                                               ul_dl_ratio=1.0)
260         self.failUnlessAlmostEqual(cost, 9133.6097158191551)
261
262     def test_convolve(self):
263         f = statistics.convolve
264         v1 = [ 1, 2, 3 ]
265         v2 = [ 4, 5, 6 ]
266         v3 = [ 7, 8 ]
267         v1v2result = [ 4, 13, 28, 27, 18 ]
268         # Convolution is commutative
269         r1 = f(v1, v2)
270         r2 = f(v2, v1)
271         self.failUnlessListEqual(r1, r2, "Convolution should be commutative")
272         self.failUnlessListEqual(r1, v1v2result, "Didn't match known result")
273         # Convolution is associative
274         r1 = f(f(v1, v2), v3)
275         r2 = f(v1, f(v2, v3))
276         self.failUnlessListEqual(r1, r2, "Convolution should be associative")
277         # Convolution is distributive
278         r1 = f(v3, [ a + b for a, b in zip(v1, v2) ])
279         tmp1 = f(v3, v1)
280         tmp2 = f(v3, v2)
281         r2 = [ a + b for a, b in zip(tmp1, tmp2) ]
282         self.failUnlessListEqual(r1, r2, "Convolution should be distributive")
283         # Convolution is scalar multiplication associative
284         tmp1 = f(v1, v2)
285         r1 = [ a * 4 for a in tmp1 ]
286         tmp2 = [ a * 4 for a in v1 ]
287         r2 = f(tmp2, v2)
288         self.failUnlessListEqual(r1, r2, "Convolution should be scalar multiplication associative")
289
290     def test_find_k(self):
291         f = statistics.find_k
292         g = statistics.pr_file_loss
293         plist = [.9] * 10 + [.8] * 10 # N=20
294         t = .0001
295         k = f(plist, t)
296         self.failUnlessEqual(k, 10)
297         self.failUnless(g(plist, k) < t)
298
299     def test_pr_file_loss(self):
300         f = statistics.pr_file_loss
301         plist = [.5] * 10
302         self.failUnlessEqual(f(plist, 3), .0546875)
303
304     def test_pr_backup_file_loss(self):
305         f = statistics.pr_backup_file_loss
306         plist = [.5] * 10
307         self.failUnlessEqual(f(plist, .5, 3), .02734375)
308
309
310 class Asserts(unittest.TestCase):
311     def should_assert(self, func, *args, **kwargs):
312         try:
313             func(*args, **kwargs)
314         except AssertionError, e:
315             return str(e)
316         except Exception, e:
317             self.fail("assert failed with non-AssertionError: %s" % e)
318         self.fail("assert was not caught")
319
320     def should_not_assert(self, func, *args, **kwargs):
321         if "re" in kwargs:
322             regexp = kwargs["re"]
323             del kwargs["re"]
324         try:
325             func(*args, **kwargs)
326         except AssertionError, e:
327             self.fail("assertion fired when it should not have: %s" % e)
328         except Exception, e:
329             self.fail("assertion (which shouldn't have failed) failed with non-AssertionError: %s" % e)
330         return # we're happy
331
332
333     def test_assert(self):
334         f = assertutil._assert
335         self.should_assert(f)
336         self.should_assert(f, False)
337         self.should_not_assert(f, True)
338
339         m = self.should_assert(f, False, "message")
340         self.failUnlessEqual(m, "'message' <type 'str'>", m)
341         m = self.should_assert(f, False, "message1", othermsg=12)
342         self.failUnlessEqual("'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
343         m = self.should_assert(f, False, othermsg="message2")
344         self.failUnlessEqual("othermsg: 'message2' <type 'str'>", m)
345
346     def test_precondition(self):
347         f = assertutil.precondition
348         self.should_assert(f)
349         self.should_assert(f, False)
350         self.should_not_assert(f, True)
351
352         m = self.should_assert(f, False, "message")
353         self.failUnlessEqual("precondition: 'message' <type 'str'>", m)
354         m = self.should_assert(f, False, "message1", othermsg=12)
355         self.failUnlessEqual("precondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
356         m = self.should_assert(f, False, othermsg="message2")
357         self.failUnlessEqual("precondition: othermsg: 'message2' <type 'str'>", m)
358
359     def test_postcondition(self):
360         f = assertutil.postcondition
361         self.should_assert(f)
362         self.should_assert(f, False)
363         self.should_not_assert(f, True)
364
365         m = self.should_assert(f, False, "message")
366         self.failUnlessEqual("postcondition: 'message' <type 'str'>", m)
367         m = self.should_assert(f, False, "message1", othermsg=12)
368         self.failUnlessEqual("postcondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
369         m = self.should_assert(f, False, othermsg="message2")
370         self.failUnlessEqual("postcondition: othermsg: 'message2' <type 'str'>", m)
371
372 class FileUtil(unittest.TestCase):
373     def mkdir(self, basedir, path, mode=0777):
374         fn = os.path.join(basedir, path)
375         fileutil.make_dirs(fn, mode)
376
377     def touch(self, basedir, path, mode=None, data="touch\n"):
378         fn = os.path.join(basedir, path)
379         f = open(fn, "w")
380         f.write(data)
381         f.close()
382         if mode is not None:
383             os.chmod(fn, mode)
384
385     def test_rm_dir(self):
386         basedir = "util/FileUtil/test_rm_dir"
387         fileutil.make_dirs(basedir)
388         # create it again to test idempotency
389         fileutil.make_dirs(basedir)
390         d = os.path.join(basedir, "doomed")
391         self.mkdir(d, "a/b")
392         self.touch(d, "a/b/1.txt")
393         self.touch(d, "a/b/2.txt", 0444)
394         self.touch(d, "a/b/3.txt", 0)
395         self.mkdir(d, "a/c")
396         self.touch(d, "a/c/1.txt")
397         self.touch(d, "a/c/2.txt", 0444)
398         self.touch(d, "a/c/3.txt", 0)
399         os.chmod(os.path.join(d, "a/c"), 0444)
400         self.mkdir(d, "a/d")
401         self.touch(d, "a/d/1.txt")
402         self.touch(d, "a/d/2.txt", 0444)
403         self.touch(d, "a/d/3.txt", 0)
404         os.chmod(os.path.join(d, "a/d"), 0)
405
406         fileutil.rm_dir(d)
407         self.failIf(os.path.exists(d))
408         # remove it again to test idempotency
409         fileutil.rm_dir(d)
410
411     def test_remove_if_possible(self):
412         basedir = "util/FileUtil/test_remove_if_possible"
413         fileutil.make_dirs(basedir)
414         self.touch(basedir, "here")
415         fn = os.path.join(basedir, "here")
416         fileutil.remove_if_possible(fn)
417         self.failIf(os.path.exists(fn))
418         fileutil.remove_if_possible(fn) # should be idempotent
419         fileutil.rm_dir(basedir)
420         fileutil.remove_if_possible(fn) # should survive errors
421
422     def test_open_or_create(self):
423         basedir = "util/FileUtil/test_open_or_create"
424         fileutil.make_dirs(basedir)
425         fn = os.path.join(basedir, "here")
426         f = fileutil.open_or_create(fn)
427         f.write("stuff.")
428         f.close()
429         f = fileutil.open_or_create(fn)
430         f.seek(0, 2)
431         f.write("more.")
432         f.close()
433         f = open(fn, "r")
434         data = f.read()
435         f.close()
436         self.failUnlessEqual(data, "stuff.more.")
437
438     def test_NamedTemporaryDirectory(self):
439         basedir = "util/FileUtil/test_NamedTemporaryDirectory"
440         fileutil.make_dirs(basedir)
441         td = fileutil.NamedTemporaryDirectory(dir=basedir)
442         name = td.name
443         self.failUnless(basedir in name)
444         self.failUnless(basedir in repr(td))
445         self.failUnless(os.path.isdir(name))
446         del td
447         # it is conceivable that we need to force gc here, but I'm not sure
448         self.failIf(os.path.isdir(name))
449
450     def test_rename(self):
451         basedir = "util/FileUtil/test_rename"
452         fileutil.make_dirs(basedir)
453         self.touch(basedir, "here")
454         fn = os.path.join(basedir, "here")
455         fn2 = os.path.join(basedir, "there")
456         fileutil.rename(fn, fn2)
457         self.failIf(os.path.exists(fn))
458         self.failUnless(os.path.exists(fn2))
459
460     def test_du(self):
461         basedir = "util/FileUtil/test_du"
462         fileutil.make_dirs(basedir)
463         d = os.path.join(basedir, "space-consuming")
464         self.mkdir(d, "a/b")
465         self.touch(d, "a/b/1.txt", data="a"*10)
466         self.touch(d, "a/b/2.txt", data="b"*11)
467         self.mkdir(d, "a/c")
468         self.touch(d, "a/c/1.txt", data="c"*12)
469         self.touch(d, "a/c/2.txt", data="d"*13)
470
471         used = fileutil.du(basedir)
472         self.failUnlessEqual(10+11+12+13, used)
473
474 class PollMixinTests(unittest.TestCase):
475     def setUp(self):
476         self.pm = pollmixin.PollMixin()
477
478     def test_PollMixin_True(self):
479         d = self.pm.poll(check_f=lambda : True,
480                          pollinterval=0.1)
481         return d
482
483     def test_PollMixin_False_then_True(self):
484         i = iter([False, True])
485         d = self.pm.poll(check_f=i.next,
486                          pollinterval=0.1)
487         return d
488
489     def test_timeout(self):
490         d = self.pm.poll(check_f=lambda: False,
491                          pollinterval=0.01,
492                          timeout=1)
493         def _suc(res):
494             self.fail("poll should have failed, not returned %s" % (res,))
495         def _err(f):
496             f.trap(pollmixin.TimeoutError)
497             return None # success
498         d.addCallbacks(_suc, _err)
499         return d
500
501 class DeferredUtilTests(unittest.TestCase):
502     def test_gather_results(self):
503         d1 = defer.Deferred()
504         d2 = defer.Deferred()
505         res = deferredutil.gatherResults([d1, d2])
506         d1.errback(ValueError("BAD"))
507         def _callb(res):
508             self.fail("Should have errbacked, not resulted in %s" % (res,))
509         def _errb(thef):
510             thef.trap(ValueError)
511         res.addCallbacks(_callb, _errb)
512         return res
513
514     def test_success(self):
515         d1, d2 = defer.Deferred(), defer.Deferred()
516         good = []
517         bad = []
518         dlss = deferredutil.DeferredListShouldSucceed([d1,d2])
519         dlss.addCallbacks(good.append, bad.append)
520         d1.callback(1)
521         d2.callback(2)
522         self.failUnlessEqual(good, [[1,2]])
523         self.failUnlessEqual(bad, [])
524
525     def test_failure(self):
526         d1, d2 = defer.Deferred(), defer.Deferred()
527         good = []
528         bad = []
529         dlss = deferredutil.DeferredListShouldSucceed([d1,d2])
530         dlss.addCallbacks(good.append, bad.append)
531         d1.addErrback(lambda _ignore: None)
532         d2.addErrback(lambda _ignore: None)
533         d1.callback(1)
534         d2.errback(RuntimeError())
535         self.failUnlessEqual(good, [])
536         self.failUnlessEqual(len(bad), 1)
537         f = bad[0]
538         self.failUnless(isinstance(f, failure.Failure))
539         self.failUnless(f.check(RuntimeError))
540
541 class HashUtilTests(unittest.TestCase):
542
543     def test_random_key(self):
544         k = hashutil.random_key()
545         self.failUnlessEqual(len(k), hashutil.KEYLEN)
546
547     def test_sha256d(self):
548         h1 = hashutil.tagged_hash("tag1", "value")
549         h2 = hashutil.tagged_hasher("tag1")
550         h2.update("value")
551         h2a = h2.digest()
552         h2b = h2.digest()
553         self.failUnlessEqual(h1, h2a)
554         self.failUnlessEqual(h2a, h2b)
555
556     def test_sha256d_truncated(self):
557         h1 = hashutil.tagged_hash("tag1", "value", 16)
558         h2 = hashutil.tagged_hasher("tag1", 16)
559         h2.update("value")
560         h2 = h2.digest()
561         self.failUnlessEqual(len(h1), 16)
562         self.failUnlessEqual(len(h2), 16)
563         self.failUnlessEqual(h1, h2)
564
565     def test_chk(self):
566         h1 = hashutil.convergence_hash(3, 10, 1000, "data", "secret")
567         h2 = hashutil.convergence_hasher(3, 10, 1000, "secret")
568         h2.update("data")
569         h2 = h2.digest()
570         self.failUnlessEqual(h1, h2)
571
572     def test_hashers(self):
573         h1 = hashutil.block_hash("foo")
574         h2 = hashutil.block_hasher()
575         h2.update("foo")
576         self.failUnlessEqual(h1, h2.digest())
577
578         h1 = hashutil.uri_extension_hash("foo")
579         h2 = hashutil.uri_extension_hasher()
580         h2.update("foo")
581         self.failUnlessEqual(h1, h2.digest())
582
583         h1 = hashutil.plaintext_hash("foo")
584         h2 = hashutil.plaintext_hasher()
585         h2.update("foo")
586         self.failUnlessEqual(h1, h2.digest())
587
588         h1 = hashutil.crypttext_hash("foo")
589         h2 = hashutil.crypttext_hasher()
590         h2.update("foo")
591         self.failUnlessEqual(h1, h2.digest())
592
593         h1 = hashutil.crypttext_segment_hash("foo")
594         h2 = hashutil.crypttext_segment_hasher()
595         h2.update("foo")
596         self.failUnlessEqual(h1, h2.digest())
597
598         h1 = hashutil.plaintext_segment_hash("foo")
599         h2 = hashutil.plaintext_segment_hasher()
600         h2.update("foo")
601         self.failUnlessEqual(h1, h2.digest())
602
603 class Abbreviate(unittest.TestCase):
604     def test_time(self):
605         a = abbreviate.abbreviate_time
606         self.failUnlessEqual(a(None), "unknown")
607         self.failUnlessEqual(a(0), "0 seconds")
608         self.failUnlessEqual(a(1), "1 second")
609         self.failUnlessEqual(a(2), "2 seconds")
610         self.failUnlessEqual(a(119), "119 seconds")
611         MIN = 60
612         self.failUnlessEqual(a(2*MIN), "2 minutes")
613         self.failUnlessEqual(a(60*MIN), "60 minutes")
614         self.failUnlessEqual(a(179*MIN), "179 minutes")
615         HOUR = 60*MIN
616         self.failUnlessEqual(a(180*MIN), "3 hours")
617         self.failUnlessEqual(a(4*HOUR), "4 hours")
618         DAY = 24*HOUR
619         MONTH = 30*DAY
620         self.failUnlessEqual(a(2*DAY), "2 days")
621         self.failUnlessEqual(a(2*MONTH), "2 months")
622         YEAR = 365*DAY
623         self.failUnlessEqual(a(5*YEAR), "5 years")
624
625     def test_space(self):
626         tests_si = [(None, "unknown"),
627                     (0, "0 B"),
628                     (1, "1 B"),
629                     (999, "999 B"),
630                     (1000, "1000 B"),
631                     (1023, "1023 B"),
632                     (1024, "1.02 kB"),
633                     (20*1000, "20.00 kB"),
634                     (1024*1024, "1.05 MB"),
635                     (1000*1000, "1.00 MB"),
636                     (1000*1000*1000, "1.00 GB"),
637                     (1000*1000*1000*1000, "1.00 TB"),
638                     (1000*1000*1000*1000*1000, "1.00 PB"),
639                     (1234567890123456, "1.23 PB"),
640                     ]
641         for (x, expected) in tests_si:
642             got = abbreviate.abbreviate_space(x, SI=True)
643             self.failUnlessEqual(got, expected)
644
645         tests_base1024 = [(None, "unknown"),
646                           (0, "0 B"),
647                           (1, "1 B"),
648                           (999, "999 B"),
649                           (1000, "1000 B"),
650                           (1023, "1023 B"),
651                           (1024, "1.00 kiB"),
652                           (20*1024, "20.00 kiB"),
653                           (1000*1000, "976.56 kiB"),
654                           (1024*1024, "1.00 MiB"),
655                           (1024*1024*1024, "1.00 GiB"),
656                           (1024*1024*1024*1024, "1.00 TiB"),
657                           (1000*1000*1000*1000*1000, "909.49 TiB"),
658                           (1024*1024*1024*1024*1024, "1.00 PiB"),
659                           (1234567890123456, "1.10 PiB"),
660                     ]
661         for (x, expected) in tests_base1024:
662             got = abbreviate.abbreviate_space(x, SI=False)
663             self.failUnlessEqual(got, expected)
664
665         self.failUnlessEqual(abbreviate.abbreviate_space_both(1234567),
666                              "(1.23 MB, 1.18 MiB)")
667
668     def test_parse_space(self):
669         p = abbreviate.parse_abbreviated_size
670         self.failUnlessEqual(p(""), None)
671         self.failUnlessEqual(p(None), None)
672         self.failUnlessEqual(p("123"), 123)
673         self.failUnlessEqual(p("123B"), 123)
674         self.failUnlessEqual(p("2K"), 2000)
675         self.failUnlessEqual(p("2kb"), 2000)
676         self.failUnlessEqual(p("2KiB"), 2048)
677         self.failUnlessEqual(p("10MB"), 10*1000*1000)
678         self.failUnlessEqual(p("10MiB"), 10*1024*1024)
679         self.failUnlessEqual(p("5G"), 5*1000*1000*1000)
680         self.failUnlessEqual(p("4GiB"), 4*1024*1024*1024)
681         e = self.failUnlessRaises(ValueError, p, "12 cubits")
682         self.failUnless("12 cubits" in str(e))
683
684 class Limiter(unittest.TestCase):
685     def job(self, i, foo):
686         self.calls.append( (i, foo) )
687         self.simultaneous += 1
688         self.peak_simultaneous = max(self.simultaneous, self.peak_simultaneous)
689         d = defer.Deferred()
690         def _done():
691             self.simultaneous -= 1
692             d.callback("done %d" % i)
693         reactor.callLater(1.0, _done)
694         return d
695
696     def bad_job(self, i, foo):
697         raise RuntimeError("bad_job %d" % i)
698
699     def test_limiter(self):
700         self.calls = []
701         self.simultaneous = 0
702         self.peak_simultaneous = 0
703         l = limiter.ConcurrencyLimiter()
704         dl = []
705         for i in range(20):
706             dl.append(l.add(self.job, i, foo=str(i)))
707         d = defer.DeferredList(dl, fireOnOneErrback=True)
708         def _done(res):
709             self.failUnlessEqual(self.simultaneous, 0)
710             self.failUnless(self.peak_simultaneous <= 10)
711             self.failUnlessEqual(len(self.calls), 20)
712             for i in range(20):
713                 self.failUnless( (i, str(i)) in self.calls)
714         d.addCallback(_done)
715         return d
716
717     def test_errors(self):
718         self.calls = []
719         self.simultaneous = 0
720         self.peak_simultaneous = 0
721         l = limiter.ConcurrencyLimiter()
722         dl = []
723         for i in range(20):
724             dl.append(l.add(self.job, i, foo=str(i)))
725         d2 = l.add(self.bad_job, 21, "21")
726         d = defer.DeferredList(dl, fireOnOneErrback=True)
727         def _most_done(res):
728             results = []
729             for (success, result) in res:
730                 self.failUnlessEqual(success, True)
731                 results.append(result)
732             results.sort()
733             expected_results = ["done %d" % i for i in range(20)]
734             expected_results.sort()
735             self.failUnlessEqual(results, expected_results)
736             self.failUnless(self.peak_simultaneous <= 10)
737             self.failUnlessEqual(len(self.calls), 20)
738             for i in range(20):
739                 self.failUnless( (i, str(i)) in self.calls)
740             def _good(res):
741                 self.fail("should have failed, not got %s" % (res,))
742             def _err(f):
743                 f.trap(RuntimeError)
744                 self.failUnless("bad_job 21" in str(f))
745             d2.addCallbacks(_good, _err)
746             return d2
747         d.addCallback(_most_done)
748         def _all_done(res):
749             self.failUnlessEqual(self.simultaneous, 0)
750             self.failUnless(self.peak_simultaneous <= 10)
751             self.failUnlessEqual(len(self.calls), 20)
752             for i in range(20):
753                 self.failUnless( (i, str(i)) in self.calls)
754         d.addCallback(_all_done)
755         return d
756
757 class TimeFormat(unittest.TestCase):
758     def test_epoch(self):
759         s = time_format.iso_utc_time_to_localseconds("1970-01-01T00:00:01")
760         self.failUnlessEqual(s, 1.0)
761         s = time_format.iso_utc_time_to_localseconds("1970-01-01_00:00:01")
762         self.failUnlessEqual(s, 1.0)
763         s = time_format.iso_utc_time_to_localseconds("1970-01-01 00:00:01")
764         self.failUnlessEqual(s, 1.0)
765
766         self.failUnlessEqual(time_format.iso_utc(1.0), "1970-01-01_00:00:01")
767         self.failUnlessEqual(time_format.iso_utc(1.0, sep=" "),
768                              "1970-01-01 00:00:01")
769         now = time.time()
770         def my_time():
771             return 1.0
772         self.failUnlessEqual(time_format.iso_utc(t=my_time),
773                              "1970-01-01_00:00:01")
774         e = self.failUnlessRaises(ValueError,
775                                   time_format.iso_utc_time_to_localseconds,
776                                   "invalid timestring")
777         self.failUnless("not a complete ISO8601 timestamp" in str(e))
778         s = time_format.iso_utc_time_to_localseconds("1970-01-01_00:00:01.500")
779         self.failUnlessEqual(s, 1.5)
780
781 class CacheDir(unittest.TestCase):
782     def test_basic(self):
783         basedir = "test_util/CacheDir/test_basic"
784
785         def _failIfExists(name):
786             absfn = os.path.join(basedir, name)
787             self.failIf(os.path.exists(absfn),
788                         "%s exists but it shouldn't" % absfn)
789
790         def _failUnlessExists(name):
791             absfn = os.path.join(basedir, name)
792             self.failUnless(os.path.exists(absfn),
793                             "%s doesn't exist but it should" % absfn)
794
795         cdm = cachedir.CacheDirectoryManager(basedir)
796         a = cdm.get_file("a")
797         b = cdm.get_file("b")
798         c = cdm.get_file("c")
799         f = open(a.get_filename(), "wb"); f.write("hi"); f.close(); del f
800         f = open(b.get_filename(), "wb"); f.write("hi"); f.close(); del f
801         f = open(c.get_filename(), "wb"); f.write("hi"); f.close(); del f
802
803         _failUnlessExists("a")
804         _failUnlessExists("b")
805         _failUnlessExists("c")
806
807         cdm.check()
808
809         _failUnlessExists("a")
810         _failUnlessExists("b")
811         _failUnlessExists("c")
812
813         del a
814         # this file won't be deleted yet, because it isn't old enough
815         cdm.check()
816         _failUnlessExists("a")
817         _failUnlessExists("b")
818         _failUnlessExists("c")
819
820         # we change the definition of "old" to make everything old
821         cdm.old = -10
822
823         cdm.check()
824         _failIfExists("a")
825         _failUnlessExists("b")
826         _failUnlessExists("c")
827
828         cdm.old = 60*60
829
830         del b
831
832         cdm.check()
833         _failIfExists("a")
834         _failUnlessExists("b")
835         _failUnlessExists("c")
836
837         b2 = cdm.get_file("b")
838
839         cdm.check()
840         _failIfExists("a")
841         _failUnlessExists("b")
842         _failUnlessExists("c")
843
844 ctr = [0]
845 class EqButNotIs:
846     def __init__(self, x):
847         self.x = x
848         self.hash = ctr[0]
849         ctr[0] += 1
850     def __repr__(self):
851         return "<%s %s>" % (self.__class__.__name__, self.x,)
852     def __hash__(self):
853         return self.hash
854     def __le__(self, other):
855         return self.x <= other
856     def __lt__(self, other):
857         return self.x < other
858     def __ge__(self, other):
859         return self.x >= other
860     def __gt__(self, other):
861         return self.x > other
862     def __ne__(self, other):
863         return self.x != other
864     def __eq__(self, other):
865         return self.x == other
866
867 class DictUtil(unittest.TestCase):
868     def _help_test_empty_dict(self, klass):
869         d1 = klass()
870         d2 = klass({})
871
872         self.failUnless(d1 == d2, "d1: %r, d2: %r" % (d1, d2,))
873         self.failUnless(len(d1) == 0)
874         self.failUnless(len(d2) == 0)
875
876     def _help_test_nonempty_dict(self, klass):
877         d1 = klass({'a': 1, 'b': "eggs", 3: "spam",})
878         d2 = klass({'a': 1, 'b': "eggs", 3: "spam",})
879
880         self.failUnless(d1 == d2)
881         self.failUnless(len(d1) == 3, "%s, %s" % (len(d1), d1,))
882         self.failUnless(len(d2) == 3)
883
884     def _help_test_eq_but_notis(self, klass):
885         d = klass({'a': 3, 'b': EqButNotIs(3), 'c': 3})
886         d.pop('b')
887
888         d.clear()
889         d['a'] = 3
890         d['b'] = EqButNotIs(3)
891         d['c'] = 3
892         d.pop('b')
893
894         d.clear()
895         d['b'] = EqButNotIs(3)
896         d['a'] = 3
897         d['c'] = 3
898         d.pop('b')
899
900         d.clear()
901         d['a'] = EqButNotIs(3)
902         d['c'] = 3
903         d['a'] = 3
904
905         d.clear()
906         fake3 = EqButNotIs(3)
907         fake7 = EqButNotIs(7)
908         d[fake3] = fake7
909         d[3] = 7
910         d[3] = 8
911         self.failUnless(filter(lambda x: x is 8,  d.itervalues()))
912         self.failUnless(filter(lambda x: x is fake7,  d.itervalues()))
913         # The real 7 should have been ejected by the d[3] = 8.
914         self.failUnless(not filter(lambda x: x is 7,  d.itervalues()))
915         self.failUnless(filter(lambda x: x is fake3,  d.iterkeys()))
916         self.failUnless(filter(lambda x: x is 3,  d.iterkeys()))
917         d[fake3] = 8
918
919         d.clear()
920         d[3] = 7
921         fake3 = EqButNotIs(3)
922         fake7 = EqButNotIs(7)
923         d[fake3] = fake7
924         d[3] = 8
925         self.failUnless(filter(lambda x: x is 8,  d.itervalues()))
926         self.failUnless(filter(lambda x: x is fake7,  d.itervalues()))
927         # The real 7 should have been ejected by the d[3] = 8.
928         self.failUnless(not filter(lambda x: x is 7,  d.itervalues()))
929         self.failUnless(filter(lambda x: x is fake3,  d.iterkeys()))
930         self.failUnless(filter(lambda x: x is 3,  d.iterkeys()))
931         d[fake3] = 8
932
933     def test_all(self):
934         self._help_test_eq_but_notis(dictutil.UtilDict)
935         self._help_test_eq_but_notis(dictutil.NumDict)
936         self._help_test_eq_but_notis(dictutil.ValueOrderedDict)
937         self._help_test_nonempty_dict(dictutil.UtilDict)
938         self._help_test_nonempty_dict(dictutil.NumDict)
939         self._help_test_nonempty_dict(dictutil.ValueOrderedDict)
940         self._help_test_eq_but_notis(dictutil.UtilDict)
941         self._help_test_eq_but_notis(dictutil.NumDict)
942         self._help_test_eq_but_notis(dictutil.ValueOrderedDict)
943
944     def test_dict_of_sets(self):
945         ds = dictutil.DictOfSets()
946         ds.add(1, "a")
947         ds.add(2, "b")
948         ds.add(2, "b")
949         ds.add(2, "c")
950         self.failUnlessEqual(ds[1], set(["a"]))
951         self.failUnlessEqual(ds[2], set(["b", "c"]))
952         ds.discard(3, "d") # should not raise an exception
953         ds.discard(2, "b")
954         self.failUnlessEqual(ds[2], set(["c"]))
955         ds.discard(2, "c")
956         self.failIf(2 in ds)
957
958         ds.union(1, ["a", "e"])
959         ds.union(3, ["f"])
960         self.failUnlessEqual(ds[1], set(["a","e"]))
961         self.failUnlessEqual(ds[3], set(["f"]))
962         ds2 = dictutil.DictOfSets()
963         ds2.add(3, "f")
964         ds2.add(3, "g")
965         ds2.add(4, "h")
966         ds.update(ds2)
967         self.failUnlessEqual(ds[1], set(["a","e"]))
968         self.failUnlessEqual(ds[3], set(["f", "g"]))
969         self.failUnlessEqual(ds[4], set(["h"]))
970
971     def test_move(self):
972         d1 = {1: "a", 2: "b"}
973         d2 = {2: "c", 3: "d"}
974         dictutil.move(1, d1, d2)
975         self.failUnlessEqual(d1, {2: "b"})
976         self.failUnlessEqual(d2, {1: "a", 2: "c", 3: "d"})
977
978         d1 = {1: "a", 2: "b"}
979         d2 = {2: "c", 3: "d"}
980         dictutil.move(2, d1, d2)
981         self.failUnlessEqual(d1, {1: "a"})
982         self.failUnlessEqual(d2, {2: "b", 3: "d"})
983
984         d1 = {1: "a", 2: "b"}
985         d2 = {2: "c", 3: "d"}
986         self.failUnlessRaises(KeyError, dictutil.move, 5, d1, d2, strict=True)
987
988     def test_subtract(self):
989         d1 = {1: "a", 2: "b"}
990         d2 = {2: "c", 3: "d"}
991         d3 = dictutil.subtract(d1, d2)
992         self.failUnlessEqual(d3, {1: "a"})
993
994         d1 = {1: "a", 2: "b"}
995         d2 = {2: "c"}
996         d3 = dictutil.subtract(d1, d2)
997         self.failUnlessEqual(d3, {1: "a"})
998
999     def test_utildict(self):
1000         d = dictutil.UtilDict({1: "a", 2: "b"})
1001         d.del_if_present(1)
1002         d.del_if_present(3)
1003         self.failUnlessEqual(d, {2: "b"})
1004         def eq(a, b):
1005             return a == b
1006         self.failUnlessRaises(TypeError, eq, d, "not a dict")
1007
1008         d = dictutil.UtilDict({1: "b", 2: "a"})
1009         self.failUnlessEqual(d.items_sorted_by_value(),
1010                              [(2, "a"), (1, "b")])
1011         self.failUnlessEqual(d.items_sorted_by_key(),
1012                              [(1, "b"), (2, "a")])
1013         self.failUnlessEqual(repr(d), "{1: 'b', 2: 'a'}")
1014         self.failUnless(1 in d)
1015
1016         d2 = dictutil.UtilDict({3: "c", 4: "d"})
1017         self.failUnless(d != d2)
1018         self.failUnless(d2 > d)
1019         self.failUnless(d2 >= d)
1020         self.failUnless(d <= d2)
1021         self.failUnless(d < d2)
1022         self.failUnlessEqual(d[1], "b")
1023         self.failUnlessEqual(sorted(list([k for k in d])), [1,2])
1024
1025         d3 = d.copy()
1026         self.failUnlessEqual(d, d3)
1027         self.failUnless(isinstance(d3, dictutil.UtilDict))
1028
1029         d4 = d.fromkeys([3,4], "e")
1030         self.failUnlessEqual(d4, {3: "e", 4: "e"})
1031
1032         self.failUnlessEqual(d.get(1), "b")
1033         self.failUnlessEqual(d.get(3), None)
1034         self.failUnlessEqual(d.get(3, "default"), "default")
1035         self.failUnlessEqual(sorted(list(d.items())),
1036                              [(1, "b"), (2, "a")])
1037         self.failUnlessEqual(sorted(list(d.iteritems())),
1038                              [(1, "b"), (2, "a")])
1039         self.failUnlessEqual(sorted(d.keys()), [1, 2])
1040         self.failUnlessEqual(sorted(d.values()), ["a", "b"])
1041         x = d.setdefault(1, "new")
1042         self.failUnlessEqual(x, "b")
1043         self.failUnlessEqual(d[1], "b")
1044         x = d.setdefault(3, "new")
1045         self.failUnlessEqual(x, "new")
1046         self.failUnlessEqual(d[3], "new")
1047         del d[3]
1048
1049         x = d.popitem()
1050         self.failUnless(x in [(1, "b"), (2, "a")])
1051         x = d.popitem()
1052         self.failUnless(x in [(1, "b"), (2, "a")])
1053         self.failUnlessRaises(KeyError, d.popitem)
1054
1055     def test_numdict(self):
1056         d = dictutil.NumDict({"a": 1, "b": 2})
1057
1058         d.add_num("a", 10, 5)
1059         d.add_num("c", 20, 5)
1060         d.add_num("d", 30)
1061         self.failUnlessEqual(d, {"a": 11, "b": 2, "c": 25, "d": 30})
1062
1063         d.subtract_num("a", 10)
1064         d.subtract_num("e", 10)
1065         d.subtract_num("f", 10, 15)
1066         self.failUnlessEqual(d, {"a": 1, "b": 2, "c": 25, "d": 30,
1067                                  "e": -10, "f": 5})
1068
1069         self.failUnlessEqual(d.sum(), sum([1, 2, 25, 30, -10, 5]))
1070
1071         d = dictutil.NumDict()
1072         d.inc("a")
1073         d.inc("a")
1074         d.inc("b", 5)
1075         self.failUnlessEqual(d, {"a": 2, "b": 6})
1076         d.dec("a")
1077         d.dec("c")
1078         d.dec("d", 5)
1079         self.failUnlessEqual(d, {"a": 1, "b": 6, "c": -1, "d": 4})
1080         self.failUnlessEqual(d.items_sorted_by_key(),
1081                              [("a", 1), ("b", 6), ("c", -1), ("d", 4)])
1082         self.failUnlessEqual(d.items_sorted_by_value(),
1083                              [("c", -1), ("a", 1), ("d", 4), ("b", 6)])
1084         self.failUnlessEqual(d.item_with_largest_value(), ("b", 6))
1085
1086         d = dictutil.NumDict({"a": 1, "b": 2})
1087         self.failUnlessEqual(repr(d), "{'a': 1, 'b': 2}")
1088         self.failUnless("a" in d)
1089
1090         d2 = dictutil.NumDict({"c": 3, "d": 4})
1091         self.failUnless(d != d2)
1092         self.failUnless(d2 > d)
1093         self.failUnless(d2 >= d)
1094         self.failUnless(d <= d2)
1095         self.failUnless(d < d2)
1096         self.failUnlessEqual(d["a"], 1)
1097         self.failUnlessEqual(sorted(list([k for k in d])), ["a","b"])
1098         def eq(a, b):
1099             return a == b
1100         self.failUnlessRaises(TypeError, eq, d, "not a dict")
1101
1102         d3 = d.copy()
1103         self.failUnlessEqual(d, d3)
1104         self.failUnless(isinstance(d3, dictutil.NumDict))
1105
1106         d4 = d.fromkeys(["a","b"], 5)
1107         self.failUnlessEqual(d4, {"a": 5, "b": 5})
1108
1109         self.failUnlessEqual(d.get("a"), 1)
1110         self.failUnlessEqual(d.get("c"), 0)
1111         self.failUnlessEqual(d.get("c", 5), 5)
1112         self.failUnlessEqual(sorted(list(d.items())),
1113                              [("a", 1), ("b", 2)])
1114         self.failUnlessEqual(sorted(list(d.iteritems())),
1115                              [("a", 1), ("b", 2)])
1116         self.failUnlessEqual(sorted(d.keys()), ["a", "b"])
1117         self.failUnlessEqual(sorted(d.values()), [1, 2])
1118         self.failUnless(d.has_key("a"))
1119         self.failIf(d.has_key("c"))
1120
1121         x = d.setdefault("c", 3)
1122         self.failUnlessEqual(x, 3)
1123         self.failUnlessEqual(d["c"], 3)
1124         x = d.setdefault("c", 5)
1125         self.failUnlessEqual(x, 3)
1126         self.failUnlessEqual(d["c"], 3)
1127         del d["c"]
1128
1129         x = d.popitem()
1130         self.failUnless(x in [("a", 1), ("b", 2)])
1131         x = d.popitem()
1132         self.failUnless(x in [("a", 1), ("b", 2)])
1133         self.failUnlessRaises(KeyError, d.popitem)
1134
1135         d.update({"c": 3})
1136         d.update({"c": 4, "d": 5})
1137         self.failUnlessEqual(d, {"c": 4, "d": 5})
1138
1139     def test_del_if_present(self):
1140         d = {1: "a", 2: "b"}
1141         dictutil.del_if_present(d, 1)
1142         dictutil.del_if_present(d, 3)
1143         self.failUnlessEqual(d, {2: "b"})
1144
1145     def test_valueordereddict(self):
1146         d = dictutil.ValueOrderedDict()
1147         d["a"] = 3
1148         d["b"] = 2
1149         d["c"] = 1
1150
1151         self.failUnlessEqual(d, {"a": 3, "b": 2, "c": 1})
1152         self.failUnlessEqual(d.items(), [("c", 1), ("b", 2), ("a", 3)])
1153         self.failUnlessEqual(d.values(), [1, 2, 3])
1154         self.failUnlessEqual(d.keys(), ["c", "b", "a"])
1155         self.failUnlessEqual(repr(d), "<ValueOrderedDict {c: 1, b: 2, a: 3}>")
1156         def eq(a, b):
1157             return a == b
1158         self.failIf(d == {"a": 4})
1159         self.failUnless(d != {"a": 4})
1160
1161         x = d.setdefault("d", 0)
1162         self.failUnlessEqual(x, 0)
1163         self.failUnlessEqual(d["d"], 0)
1164         x = d.setdefault("d", -1)
1165         self.failUnlessEqual(x, 0)
1166         self.failUnlessEqual(d["d"], 0)
1167
1168         x = d.remove("e", "default", False)
1169         self.failUnlessEqual(x, "default")
1170         self.failUnlessRaises(KeyError, d.remove, "e", "default", True)
1171         x = d.remove("d", 5)
1172         self.failUnlessEqual(x, 0)
1173
1174         x = d.__getitem__("c")
1175         self.failUnlessEqual(x, 1)
1176         x = d.__getitem__("e", "default", False)
1177         self.failUnlessEqual(x, "default")
1178         self.failUnlessRaises(KeyError, d.__getitem__, "e", "default", True)
1179
1180         self.failUnlessEqual(d.popitem(), ("c", 1))
1181         self.failUnlessEqual(d.popitem(), ("b", 2))
1182         self.failUnlessEqual(d.popitem(), ("a", 3))
1183         self.failUnlessRaises(KeyError, d.popitem)
1184
1185         d = dictutil.ValueOrderedDict({"a": 3, "b": 2, "c": 1})
1186         x = d.pop("d", "default", False)
1187         self.failUnlessEqual(x, "default")
1188         self.failUnlessRaises(KeyError, d.pop, "d", "default", True)
1189         x = d.pop("b")
1190         self.failUnlessEqual(x, 2)
1191         self.failUnlessEqual(d.items(), [("c", 1), ("a", 3)])
1192
1193         d = dictutil.ValueOrderedDict({"a": 3, "b": 2, "c": 1})
1194         x = d.pop_from_list(1) # pop the second item, b/2
1195         self.failUnlessEqual(x, "b")
1196         self.failUnlessEqual(d.items(), [("c", 1), ("a", 3)])
1197