2 def foo(): pass # keep the line number constant
5 from StringIO import StringIO
6 from twisted.trial import unittest
7 from twisted.internet import defer, reactor
8 from twisted.python.failure import Failure
9 from twisted.python import log
10 from pycryptopp.hash.sha256 import SHA256 as _hash
12 from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil
13 from allmydata.util import assertutil, fileutil, deferredutil, abbreviate
14 from allmydata.util import limiter, time_format, pollmixin, cachedir
15 from allmydata.util import statistics, dictutil, pipeline
16 from allmydata.util import log as tahoe_log
17 from allmydata.util.spans import Spans, overlap, DataSpans
18 from allmydata.test.common_util import ReallyEqualMixin
21 class Base32(unittest.TestCase):
22 def test_b2a_matches_Pythons(self):
24 y = "\x12\x34\x45\x67\x89\x0a\xbc\xde\xf0"
25 x = base64.b32encode(y)
26 while x and x[-1] == '=':
29 self.failUnlessEqual(base32.b2a(y), x)
31 self.failUnlessEqual(base32.b2a("\x12\x34"), "ci2a")
32 def test_b2a_or_none(self):
33 self.failUnlessEqual(base32.b2a_or_none(None), None)
34 self.failUnlessEqual(base32.b2a_or_none("\x12\x34"), "ci2a")
36 self.failUnlessEqual(base32.a2b("ci2a"), "\x12\x34")
37 self.failUnlessRaises(AssertionError, base32.a2b, "b0gus")
39 class IDLib(unittest.TestCase):
40 def test_nodeid_b2a(self):
41 self.failUnlessEqual(idlib.nodeid_b2a("\x00"*20), "a"*32)
43 class NoArgumentException(Exception):
47 class HumanReadable(unittest.TestCase):
50 self.failUnlessEqual(hr(foo), "<foo() at test_util.py:2>")
51 self.failUnlessEqual(hr(self.test_repr),
52 "<bound method HumanReadable.test_repr of <allmydata.test.test_util.HumanReadable testMethod=test_repr>>")
53 self.failUnlessEqual(hr(1L), "1")
54 self.failUnlessEqual(hr(10**40),
55 "100000000000000000...000000000000000000")
56 self.failUnlessEqual(hr(self), "<allmydata.test.test_util.HumanReadable testMethod=test_repr>")
57 self.failUnlessEqual(hr([1,2]), "[1, 2]")
58 self.failUnlessEqual(hr({1:2}), "{1:2}")
63 hr(e) == "<ValueError: ()>" # python-2.4
64 or hr(e) == "ValueError()") # python-2.5
66 raise ValueError("oops")
69 hr(e) == "<ValueError: 'oops'>" # python-2.4
70 or hr(e) == "ValueError('oops',)") # python-2.5
72 raise NoArgumentException
75 hr(e) == "<NoArgumentException>" # python-2.4
76 or hr(e) == "NoArgumentException()") # python-2.5
82 class Math(unittest.TestCase):
83 def test_div_ceil(self):
85 self.failUnlessEqual(f(0, 1), 0)
86 self.failUnlessEqual(f(0, 2), 0)
87 self.failUnlessEqual(f(0, 3), 0)
88 self.failUnlessEqual(f(1, 3), 1)
89 self.failUnlessEqual(f(2, 3), 1)
90 self.failUnlessEqual(f(3, 3), 1)
91 self.failUnlessEqual(f(4, 3), 2)
92 self.failUnlessEqual(f(5, 3), 2)
93 self.failUnlessEqual(f(6, 3), 2)
94 self.failUnlessEqual(f(7, 3), 3)
96 def test_next_multiple(self):
97 f = mathutil.next_multiple
98 self.failUnlessEqual(f(5, 1), 5)
99 self.failUnlessEqual(f(5, 2), 6)
100 self.failUnlessEqual(f(5, 3), 6)
101 self.failUnlessEqual(f(5, 4), 8)
102 self.failUnlessEqual(f(5, 5), 5)
103 self.failUnlessEqual(f(5, 6), 6)
104 self.failUnlessEqual(f(32, 1), 32)
105 self.failUnlessEqual(f(32, 2), 32)
106 self.failUnlessEqual(f(32, 3), 33)
107 self.failUnlessEqual(f(32, 4), 32)
108 self.failUnlessEqual(f(32, 5), 35)
109 self.failUnlessEqual(f(32, 6), 36)
110 self.failUnlessEqual(f(32, 7), 35)
111 self.failUnlessEqual(f(32, 8), 32)
112 self.failUnlessEqual(f(32, 9), 36)
113 self.failUnlessEqual(f(32, 10), 40)
114 self.failUnlessEqual(f(32, 11), 33)
115 self.failUnlessEqual(f(32, 12), 36)
116 self.failUnlessEqual(f(32, 13), 39)
117 self.failUnlessEqual(f(32, 14), 42)
118 self.failUnlessEqual(f(32, 15), 45)
119 self.failUnlessEqual(f(32, 16), 32)
120 self.failUnlessEqual(f(32, 17), 34)
121 self.failUnlessEqual(f(32, 18), 36)
122 self.failUnlessEqual(f(32, 589), 589)
124 def test_pad_size(self):
125 f = mathutil.pad_size
126 self.failUnlessEqual(f(0, 4), 0)
127 self.failUnlessEqual(f(1, 4), 3)
128 self.failUnlessEqual(f(2, 4), 2)
129 self.failUnlessEqual(f(3, 4), 1)
130 self.failUnlessEqual(f(4, 4), 0)
131 self.failUnlessEqual(f(5, 4), 3)
133 def test_is_power_of_k(self):
134 f = mathutil.is_power_of_k
135 for i in range(1, 100):
136 if i in (1, 2, 4, 8, 16, 32, 64):
137 self.failUnless(f(i, 2), "but %d *is* a power of 2" % i)
139 self.failIf(f(i, 2), "but %d is *not* a power of 2" % i)
140 for i in range(1, 100):
141 if i in (1, 3, 9, 27, 81):
142 self.failUnless(f(i, 3), "but %d *is* a power of 3" % i)
144 self.failIf(f(i, 3), "but %d is *not* a power of 3" % i)
146 def test_next_power_of_k(self):
147 f = mathutil.next_power_of_k
148 self.failUnlessEqual(f(0,2), 1)
149 self.failUnlessEqual(f(1,2), 1)
150 self.failUnlessEqual(f(2,2), 2)
151 self.failUnlessEqual(f(3,2), 4)
152 self.failUnlessEqual(f(4,2), 4)
153 for i in range(5, 8): self.failUnlessEqual(f(i,2), 8, "%d" % i)
154 for i in range(9, 16): self.failUnlessEqual(f(i,2), 16, "%d" % i)
155 for i in range(17, 32): self.failUnlessEqual(f(i,2), 32, "%d" % i)
156 for i in range(33, 64): self.failUnlessEqual(f(i,2), 64, "%d" % i)
157 for i in range(65, 100): self.failUnlessEqual(f(i,2), 128, "%d" % i)
159 self.failUnlessEqual(f(0,3), 1)
160 self.failUnlessEqual(f(1,3), 1)
161 self.failUnlessEqual(f(2,3), 3)
162 self.failUnlessEqual(f(3,3), 3)
163 for i in range(4, 9): self.failUnlessEqual(f(i,3), 9, "%d" % i)
164 for i in range(10, 27): self.failUnlessEqual(f(i,3), 27, "%d" % i)
165 for i in range(28, 81): self.failUnlessEqual(f(i,3), 81, "%d" % i)
166 for i in range(82, 200): self.failUnlessEqual(f(i,3), 243, "%d" % i)
170 self.failUnlessEqual(f([1,2,3]), 2)
171 self.failUnlessEqual(f([0,0,0,4]), 1)
172 self.failUnlessAlmostEqual(f([0.0, 1.0, 1.0]), .666666666666)
174 def test_round_sigfigs(self):
175 f = mathutil.round_sigfigs
176 self.failUnlessEqual(f(22.0/3, 4), 7.3330000000000002)
178 class Statistics(unittest.TestCase):
179 def should_assert(self, msg, func, *args, **kwargs):
181 func(*args, **kwargs)
183 except AssertionError:
186 def failUnlessListEqual(self, a, b, msg = None):
187 self.failUnlessEqual(len(a), len(b))
188 for i in range(len(a)):
189 self.failUnlessEqual(a[i], b[i], msg)
191 def failUnlessListAlmostEqual(self, a, b, places = 7, msg = None):
192 self.failUnlessEqual(len(a), len(b))
193 for i in range(len(a)):
194 self.failUnlessAlmostEqual(a[i], b[i], places, msg)
196 def test_binomial_coeff(self):
197 f = statistics.binomial_coeff
198 self.failUnlessEqual(f(20, 0), 1)
199 self.failUnlessEqual(f(20, 1), 20)
200 self.failUnlessEqual(f(20, 2), 190)
201 self.failUnlessEqual(f(20, 8), f(20, 12))
202 self.should_assert("Should assert if n < k", f, 2, 3)
204 def test_binomial_distribution_pmf(self):
205 f = statistics.binomial_distribution_pmf
208 pmf_stat = [0.81, 0.18, 0.01]
209 self.failUnlessListAlmostEqual(pmf_comp, pmf_stat)
211 # Summing across a PMF should give the total probability 1
212 self.failUnlessAlmostEqual(sum(pmf_comp), 1)
213 self.should_assert("Should assert if not 0<=p<=1", f, 1, -1)
214 self.should_assert("Should assert if n < 1", f, 0, .1)
217 statistics.print_pmf(pmf_comp, out=out)
218 lines = out.getvalue().splitlines()
219 self.failUnlessEqual(lines[0], "i=0: 0.81")
220 self.failUnlessEqual(lines[1], "i=1: 0.18")
221 self.failUnlessEqual(lines[2], "i=2: 0.01")
223 def test_survival_pmf(self):
224 f = statistics.survival_pmf
225 # Cross-check binomial-distribution method against convolution
227 p_list = [.9999] * 100 + [.99] * 50 + [.8] * 20
228 pmf1 = statistics.survival_pmf_via_conv(p_list)
229 pmf2 = statistics.survival_pmf_via_bd(p_list)
230 self.failUnlessListAlmostEqual(pmf1, pmf2)
231 self.failUnlessTrue(statistics.valid_pmf(pmf1))
232 self.should_assert("Should assert if p_i > 1", f, [1.1]);
233 self.should_assert("Should assert if p_i < 0", f, [-.1]);
235 def test_repair_count_pmf(self):
236 survival_pmf = statistics.binomial_distribution_pmf(5, .9)
237 repair_pmf = statistics.repair_count_pmf(survival_pmf, 3)
238 # repair_pmf[0] == sum(survival_pmf[0,1,2,5])
239 # repair_pmf[1] == survival_pmf[4]
240 # repair_pmf[2] = survival_pmf[3]
241 self.failUnlessListAlmostEqual(repair_pmf,
242 [0.00001 + 0.00045 + 0.0081 + 0.59049,
247 def test_repair_cost(self):
248 survival_pmf = statistics.binomial_distribution_pmf(5, .9)
249 bwcost = statistics.bandwidth_cost_function
250 cost = statistics.mean_repair_cost(bwcost, 1000,
251 survival_pmf, 3, ul_dl_ratio=1.0)
252 self.failUnlessAlmostEqual(cost, 558.90)
253 cost = statistics.mean_repair_cost(bwcost, 1000,
254 survival_pmf, 3, ul_dl_ratio=8.0)
255 self.failUnlessAlmostEqual(cost, 1664.55)
257 # I haven't manually checked the math beyond here -warner
258 cost = statistics.eternal_repair_cost(bwcost, 1000,
260 discount_rate=0, ul_dl_ratio=1.0)
261 self.failUnlessAlmostEqual(cost, 65292.056074766246)
262 cost = statistics.eternal_repair_cost(bwcost, 1000,
266 self.failUnlessAlmostEqual(cost, 9133.6097158191551)
268 def test_convolve(self):
269 f = statistics.convolve
273 v1v2result = [ 4, 13, 28, 27, 18 ]
274 # Convolution is commutative
277 self.failUnlessListEqual(r1, r2, "Convolution should be commutative")
278 self.failUnlessListEqual(r1, v1v2result, "Didn't match known result")
279 # Convolution is associative
280 r1 = f(f(v1, v2), v3)
281 r2 = f(v1, f(v2, v3))
282 self.failUnlessListEqual(r1, r2, "Convolution should be associative")
283 # Convolution is distributive
284 r1 = f(v3, [ a + b for a, b in zip(v1, v2) ])
287 r2 = [ a + b for a, b in zip(tmp1, tmp2) ]
288 self.failUnlessListEqual(r1, r2, "Convolution should be distributive")
289 # Convolution is scalar multiplication associative
291 r1 = [ a * 4 for a in tmp1 ]
292 tmp2 = [ a * 4 for a in v1 ]
294 self.failUnlessListEqual(r1, r2, "Convolution should be scalar multiplication associative")
296 def test_find_k(self):
297 f = statistics.find_k
298 g = statistics.pr_file_loss
299 plist = [.9] * 10 + [.8] * 10 # N=20
302 self.failUnlessEqual(k, 10)
303 self.failUnless(g(plist, k) < t)
305 def test_pr_file_loss(self):
306 f = statistics.pr_file_loss
308 self.failUnlessEqual(f(plist, 3), .0546875)
310 def test_pr_backup_file_loss(self):
311 f = statistics.pr_backup_file_loss
313 self.failUnlessEqual(f(plist, .5, 3), .02734375)
316 class Asserts(unittest.TestCase):
317 def should_assert(self, func, *args, **kwargs):
319 func(*args, **kwargs)
320 except AssertionError, e:
323 self.fail("assert failed with non-AssertionError: %s" % e)
324 self.fail("assert was not caught")
326 def should_not_assert(self, func, *args, **kwargs):
328 func(*args, **kwargs)
329 except AssertionError, e:
330 self.fail("assertion fired when it should not have: %s" % e)
332 self.fail("assertion (which shouldn't have failed) failed with non-AssertionError: %s" % e)
336 def test_assert(self):
337 f = assertutil._assert
338 self.should_assert(f)
339 self.should_assert(f, False)
340 self.should_not_assert(f, True)
342 m = self.should_assert(f, False, "message")
343 self.failUnlessEqual(m, "'message' <type 'str'>", m)
344 m = self.should_assert(f, False, "message1", othermsg=12)
345 self.failUnlessEqual("'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
346 m = self.should_assert(f, False, othermsg="message2")
347 self.failUnlessEqual("othermsg: 'message2' <type 'str'>", m)
349 def test_precondition(self):
350 f = assertutil.precondition
351 self.should_assert(f)
352 self.should_assert(f, False)
353 self.should_not_assert(f, True)
355 m = self.should_assert(f, False, "message")
356 self.failUnlessEqual("precondition: 'message' <type 'str'>", m)
357 m = self.should_assert(f, False, "message1", othermsg=12)
358 self.failUnlessEqual("precondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
359 m = self.should_assert(f, False, othermsg="message2")
360 self.failUnlessEqual("precondition: othermsg: 'message2' <type 'str'>", m)
362 def test_postcondition(self):
363 f = assertutil.postcondition
364 self.should_assert(f)
365 self.should_assert(f, False)
366 self.should_not_assert(f, True)
368 m = self.should_assert(f, False, "message")
369 self.failUnlessEqual("postcondition: 'message' <type 'str'>", m)
370 m = self.should_assert(f, False, "message1", othermsg=12)
371 self.failUnlessEqual("postcondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
372 m = self.should_assert(f, False, othermsg="message2")
373 self.failUnlessEqual("postcondition: othermsg: 'message2' <type 'str'>", m)
375 class FileUtil(ReallyEqualMixin, unittest.TestCase):
376 def mkdir(self, basedir, path, mode=0777):
377 fn = os.path.join(basedir, path)
378 fileutil.make_dirs(fn, mode)
380 def touch(self, basedir, path, mode=None, data="touch\n"):
381 fn = os.path.join(basedir, path)
388 def test_rm_dir(self):
389 basedir = "util/FileUtil/test_rm_dir"
390 fileutil.make_dirs(basedir)
391 # create it again to test idempotency
392 fileutil.make_dirs(basedir)
393 d = os.path.join(basedir, "doomed")
395 self.touch(d, "a/b/1.txt")
396 self.touch(d, "a/b/2.txt", 0444)
397 self.touch(d, "a/b/3.txt", 0)
399 self.touch(d, "a/c/1.txt")
400 self.touch(d, "a/c/2.txt", 0444)
401 self.touch(d, "a/c/3.txt", 0)
402 os.chmod(os.path.join(d, "a/c"), 0444)
404 self.touch(d, "a/d/1.txt")
405 self.touch(d, "a/d/2.txt", 0444)
406 self.touch(d, "a/d/3.txt", 0)
407 os.chmod(os.path.join(d, "a/d"), 0)
410 self.failIf(os.path.exists(d))
411 # remove it again to test idempotency
414 def test_remove_if_possible(self):
415 basedir = "util/FileUtil/test_remove_if_possible"
416 fileutil.make_dirs(basedir)
417 self.touch(basedir, "here")
418 fn = os.path.join(basedir, "here")
419 fileutil.remove_if_possible(fn)
420 self.failIf(os.path.exists(fn))
421 fileutil.remove_if_possible(fn) # should be idempotent
422 fileutil.rm_dir(basedir)
423 fileutil.remove_if_possible(fn) # should survive errors
425 def test_write_atomically(self):
426 basedir = "util/FileUtil/test_write_atomically"
427 fileutil.make_dirs(basedir)
428 fn = os.path.join(basedir, "here")
429 fileutil.write_atomically(fn, "one")
430 self.failUnlessEqual(fileutil.read(fn), "one")
431 fileutil.write_atomically(fn, "two", mode="") # non-binary
432 self.failUnlessEqual(fileutil.read(fn), "two")
434 def test_rename(self):
435 basedir = "util/FileUtil/test_rename"
436 fileutil.make_dirs(basedir)
437 self.touch(basedir, "here")
438 fn = os.path.join(basedir, "here")
439 fn2 = os.path.join(basedir, "there")
440 fileutil.rename(fn, fn2)
441 self.failIf(os.path.exists(fn))
442 self.failUnless(os.path.exists(fn2))
444 def test_rename_no_overwrite(self):
445 workdir = fileutil.abspath_expanduser_unicode(u"test_rename_no_overwrite")
446 fileutil.make_dirs(workdir)
448 source_path = os.path.join(workdir, "source")
449 dest_path = os.path.join(workdir, "dest")
451 # when neither file exists
452 self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
454 # when only dest exists
455 fileutil.write(dest_path, "dest")
456 self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
457 self.failUnlessEqual(fileutil.read(dest_path), "dest")
460 fileutil.write(source_path, "source")
461 self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
462 self.failUnlessEqual(fileutil.read(source_path), "source")
463 self.failUnlessEqual(fileutil.read(dest_path), "dest")
465 # when only source exists
467 fileutil.rename_no_overwrite(source_path, dest_path)
468 self.failUnlessEqual(fileutil.read(dest_path), "source")
469 self.failIf(os.path.exists(source_path))
471 def test_replace_file(self):
472 workdir = fileutil.abspath_expanduser_unicode(u"test_replace_file")
473 fileutil.make_dirs(workdir)
475 backup_path = os.path.join(workdir, "backup")
476 replaced_path = os.path.join(workdir, "replaced")
477 replacement_path = os.path.join(workdir, "replacement")
479 # when none of the files exist
480 self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path, backup_path)
482 # when only replaced exists
483 fileutil.write(replaced_path, "foo")
484 self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path, backup_path)
485 self.failUnlessEqual(fileutil.read(replaced_path), "foo")
487 # when both replaced and replacement exist, but not backup
488 fileutil.write(replacement_path, "bar")
489 fileutil.replace_file(replaced_path, replacement_path, backup_path)
490 self.failUnlessEqual(fileutil.read(backup_path), "foo")
491 self.failUnlessEqual(fileutil.read(replaced_path), "bar")
492 self.failIf(os.path.exists(replacement_path))
494 # when only replacement exists
495 os.remove(backup_path)
496 os.remove(replaced_path)
497 fileutil.write(replacement_path, "bar")
498 fileutil.replace_file(replaced_path, replacement_path, backup_path)
499 self.failUnlessEqual(fileutil.read(replaced_path), "bar")
500 self.failIf(os.path.exists(replacement_path))
501 self.failIf(os.path.exists(backup_path))
503 # when replaced, replacement and backup all exist
504 fileutil.write(replaced_path, "foo")
505 fileutil.write(replacement_path, "bar")
506 fileutil.write(backup_path, "bak")
507 fileutil.replace_file(replaced_path, replacement_path, backup_path)
508 self.failUnlessEqual(fileutil.read(backup_path), "foo")
509 self.failUnlessEqual(fileutil.read(replaced_path), "bar")
510 self.failIf(os.path.exists(replacement_path))
513 basedir = "util/FileUtil/test_du"
514 fileutil.make_dirs(basedir)
515 d = os.path.join(basedir, "space-consuming")
517 self.touch(d, "a/b/1.txt", data="a"*10)
518 self.touch(d, "a/b/2.txt", data="b"*11)
520 self.touch(d, "a/c/1.txt", data="c"*12)
521 self.touch(d, "a/c/2.txt", data="d"*13)
523 used = fileutil.du(basedir)
524 self.failUnlessEqual(10+11+12+13, used)
526 def test_abspath_expanduser_unicode(self):
527 self.failUnlessRaises(AssertionError, fileutil.abspath_expanduser_unicode, "bytestring")
529 saved_cwd = os.path.normpath(os.getcwdu())
530 abspath_cwd = fileutil.abspath_expanduser_unicode(u".")
531 abspath_cwd_notlong = fileutil.abspath_expanduser_unicode(u".", long_path=False)
532 self.failUnless(isinstance(saved_cwd, unicode), saved_cwd)
533 self.failUnless(isinstance(abspath_cwd, unicode), abspath_cwd)
534 if sys.platform == "win32":
535 self.failUnlessReallyEqual(abspath_cwd, fileutil.to_windows_long_path(saved_cwd))
537 self.failUnlessReallyEqual(abspath_cwd, saved_cwd)
538 self.failUnlessReallyEqual(abspath_cwd_notlong, saved_cwd)
540 self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\?\\foo"), u"\\\\?\\foo")
541 self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\.\\foo"), u"\\\\.\\foo")
542 self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\server\\foo"), u"\\\\?\\UNC\\server\\foo")
543 self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo"), u"\\\\?\\C:\\foo")
544 self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo/bar"), u"\\\\?\\C:\\foo\\bar")
546 # adapted from <http://svn.python.org/view/python/branches/release26-maint/Lib/test/test_posixpath.py?view=markup&pathrev=78279#test_abspath>
548 foo = fileutil.abspath_expanduser_unicode(u"foo")
549 self.failUnless(foo.endswith(u"%sfoo" % (os.path.sep,)), foo)
551 foobar = fileutil.abspath_expanduser_unicode(u"bar", base=foo)
552 self.failUnless(foobar.endswith(u"%sfoo%sbar" % (os.path.sep, os.path.sep)), foobar)
554 if sys.platform == "win32":
555 # This is checking that a drive letter is added for a path without one.
556 baz = fileutil.abspath_expanduser_unicode(u"\\baz")
557 self.failUnless(baz.startswith(u"\\\\?\\"), baz)
558 self.failUnlessReallyEqual(baz[5 :], u":\\baz")
560 bar = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz)
561 self.failUnless(bar.startswith(u"\\\\?\\"), bar)
562 self.failUnlessReallyEqual(bar[5 :], u":\\bar")
563 # not u":\\baz\\bar", because \bar is absolute on the current drive.
565 self.failUnlessReallyEqual(baz[4], bar[4]) # same drive
567 baz_notlong = fileutil.abspath_expanduser_unicode(u"\\baz", long_path=False)
568 self.failIf(baz_notlong.startswith(u"\\\\?\\"), baz_notlong)
569 self.failUnlessReallyEqual(baz_notlong[1 :], u":\\baz")
571 bar_notlong = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz, long_path=False)
572 self.failIf(bar_notlong.startswith(u"\\\\?\\"), bar_notlong)
573 self.failUnlessReallyEqual(bar_notlong[1 :], u":\\bar")
574 # not u":\\baz\\bar", because \bar is absolute on the current drive.
576 self.failUnlessReallyEqual(baz_notlong[0], bar_notlong[0]) # same drive
578 self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~"))
579 self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~", long_path=False))
583 cwds.append(u'\xe7w\xf0'.encode(sys.getfilesystemencoding()
585 except UnicodeEncodeError:
586 pass # the cwd can't be encoded -- test with ascii cwd only
592 for upath in (u'', u'fuu', u'f\xf9\xf9', u'/fuu', u'U:\\', u'~'):
593 uabspath = fileutil.abspath_expanduser_unicode(upath)
594 self.failUnless(isinstance(uabspath, unicode), uabspath)
596 uabspath_notlong = fileutil.abspath_expanduser_unicode(upath, long_path=False)
597 self.failUnless(isinstance(uabspath_notlong, unicode), uabspath_notlong)
601 def test_create_long_path(self):
602 workdir = u"test_create_long_path"
603 fileutil.make_dirs(workdir)
604 long_path = fileutil.abspath_expanduser_unicode(os.path.join(workdir, u'x'*255))
606 fileutil.remove(long_path)
607 self.addCleanup(_cleanup)
609 fileutil.write(long_path, "test")
610 self.failUnless(os.path.exists(long_path))
611 self.failUnlessEqual(fileutil.read(long_path), "test")
613 self.failIf(os.path.exists(long_path))
615 def _test_windows_expanduser(self, userprofile=None, homedrive=None, homepath=None):
616 def call_windows_getenv(name):
617 if name == u"USERPROFILE": return userprofile
618 if name == u"HOMEDRIVE": return homedrive
619 if name == u"HOMEPATH": return homepath
620 self.fail("unexpected argument to call_windows_getenv")
621 self.patch(fileutil, 'windows_getenv', call_windows_getenv)
623 self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100"))
624 self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~\\foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo"))
625 self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~/foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo"))
626 self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a"), u"a")
627 self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a~"), u"a~")
628 self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a\\~\\foo"), u"a\\~\\foo")
630 def test_windows_expanduser_xp(self):
631 return self._test_windows_expanduser(homedrive=u"C:", homepath=u"\\Documents and Settings\\\u0100")
633 def test_windows_expanduser_win7(self):
634 return self._test_windows_expanduser(userprofile=os.path.join(u"C:", u"\\Documents and Settings\\\u0100"))
636 def test_disk_stats(self):
637 avail = fileutil.get_available_space('.', 2**14)
639 raise unittest.SkipTest("This test will spuriously fail there is no disk space left.")
641 disk = fileutil.get_disk_stats('.', 2**13)
642 self.failUnless(disk['total'] > 0, disk['total'])
643 # we tolerate used==0 for a Travis-CI bug, see #2290
644 self.failUnless(disk['used'] >= 0, disk['used'])
645 self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
646 self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
647 self.failUnless(disk['avail'] > 0, disk['avail'])
649 def test_disk_stats_avail_nonnegative(self):
650 # This test will spuriously fail if you have more than 2^128
651 # bytes of available space on your filesystem.
652 disk = fileutil.get_disk_stats('.', 2**128)
653 self.failUnlessEqual(disk['avail'], 0)
655 def test_get_pathinfo(self):
656 basedir = "util/FileUtil/test_get_pathinfo"
657 fileutil.make_dirs(basedir)
660 self.mkdir(basedir, "a")
661 dirinfo = fileutil.get_pathinfo(basedir)
662 self.failUnlessTrue(dirinfo.isdir)
663 self.failUnlessTrue(dirinfo.exists)
664 self.failUnlessFalse(dirinfo.isfile)
665 self.failUnlessFalse(dirinfo.islink)
667 # create a file under the directory
668 f = os.path.join(basedir, "a", "1.txt")
669 self.touch(basedir, "a/1.txt", data="a"*10)
670 fileinfo = fileutil.get_pathinfo(f)
671 self.failUnlessTrue(fileinfo.isfile)
672 self.failUnlessTrue(fileinfo.exists)
673 self.failUnlessFalse(fileinfo.isdir)
674 self.failUnlessFalse(fileinfo.islink)
675 self.failUnlessEqual(fileinfo.size, 10)
677 # create a symlink under the directory a pointing to 1.txt
678 slname = os.path.join(basedir, "a", "linkto1.txt")
679 os.symlink(f, slname)
680 symlinkinfo = fileutil.get_pathinfo(slname)
681 self.failUnlessTrue(symlinkinfo.islink)
682 self.failUnlessTrue(symlinkinfo.exists)
683 self.failUnlessFalse(symlinkinfo.isfile)
684 self.failUnlessFalse(symlinkinfo.isdir)
686 # path at which nothing exists
687 dnename = os.path.join(basedir, "a", "doesnotexist")
689 dneinfo = fileutil.get_pathinfo(dnename, now=now)
690 self.failUnlessFalse(dneinfo.exists)
691 self.failUnlessFalse(dneinfo.isfile)
692 self.failUnlessFalse(dneinfo.isdir)
693 self.failUnlessFalse(dneinfo.islink)
694 self.failUnlessEqual(dneinfo.size, None)
695 self.failUnlessEqual(dneinfo.mtime, now)
696 self.failUnlessEqual(dneinfo.ctime, now)
699 class PollMixinTests(unittest.TestCase):
701 self.pm = pollmixin.PollMixin()
703 def test_PollMixin_True(self):
704 d = self.pm.poll(check_f=lambda : True,
708 def test_PollMixin_False_then_True(self):
709 i = iter([False, True])
710 d = self.pm.poll(check_f=i.next,
714 def test_timeout(self):
715 d = self.pm.poll(check_f=lambda: False,
719 self.fail("poll should have failed, not returned %s" % (res,))
721 f.trap(pollmixin.TimeoutError)
722 return None # success
723 d.addCallbacks(_suc, _err)
726 class DeferredUtilTests(unittest.TestCase, deferredutil.WaitForDelayedCallsMixin):
727 def test_gather_results(self):
728 d1 = defer.Deferred()
729 d2 = defer.Deferred()
730 res = deferredutil.gatherResults([d1, d2])
731 d1.errback(ValueError("BAD"))
733 self.fail("Should have errbacked, not resulted in %s" % (res,))
735 thef.trap(ValueError)
736 res.addCallbacks(_callb, _errb)
739 def test_success(self):
740 d1, d2 = defer.Deferred(), defer.Deferred()
743 dlss = deferredutil.DeferredListShouldSucceed([d1,d2])
744 dlss.addCallbacks(good.append, bad.append)
747 self.failUnlessEqual(good, [[1,2]])
748 self.failUnlessEqual(bad, [])
750 def test_failure(self):
751 d1, d2 = defer.Deferred(), defer.Deferred()
754 dlss = deferredutil.DeferredListShouldSucceed([d1,d2])
755 dlss.addCallbacks(good.append, bad.append)
756 d1.addErrback(lambda _ignore: None)
757 d2.addErrback(lambda _ignore: None)
759 d2.errback(ValueError())
760 self.failUnlessEqual(good, [])
761 self.failUnlessEqual(len(bad), 1)
763 self.failUnless(isinstance(f, Failure))
764 self.failUnless(f.check(ValueError))
766 def test_wait_for_delayed_calls(self):
768 This tests that 'wait_for_delayed_calls' does in fact wait for a
769 delayed call that is active when the test returns. If it didn't,
770 Trial would report an unclean reactor error for this test.
775 reactor.callLater(0.1, _trigger)
777 d = defer.succeed(None)
778 d.addBoth(self.wait_for_delayed_calls)
781 class HashUtilTests(unittest.TestCase):
783 def test_random_key(self):
784 k = hashutil.random_key()
785 self.failUnlessEqual(len(k), hashutil.KEYLEN)
787 def test_sha256d(self):
788 h1 = hashutil.tagged_hash("tag1", "value")
789 h2 = hashutil.tagged_hasher("tag1")
793 self.failUnlessEqual(h1, h2a)
794 self.failUnlessEqual(h2a, h2b)
796 def test_sha256d_truncated(self):
797 h1 = hashutil.tagged_hash("tag1", "value", 16)
798 h2 = hashutil.tagged_hasher("tag1", 16)
801 self.failUnlessEqual(len(h1), 16)
802 self.failUnlessEqual(len(h2), 16)
803 self.failUnlessEqual(h1, h2)
806 h1 = hashutil.convergence_hash(3, 10, 1000, "data", "secret")
807 h2 = hashutil.convergence_hasher(3, 10, 1000, "secret")
810 self.failUnlessEqual(h1, h2)
812 def test_hashers(self):
813 h1 = hashutil.block_hash("foo")
814 h2 = hashutil.block_hasher()
816 self.failUnlessEqual(h1, h2.digest())
818 h1 = hashutil.uri_extension_hash("foo")
819 h2 = hashutil.uri_extension_hasher()
821 self.failUnlessEqual(h1, h2.digest())
823 h1 = hashutil.plaintext_hash("foo")
824 h2 = hashutil.plaintext_hasher()
826 self.failUnlessEqual(h1, h2.digest())
828 h1 = hashutil.crypttext_hash("foo")
829 h2 = hashutil.crypttext_hasher()
831 self.failUnlessEqual(h1, h2.digest())
833 h1 = hashutil.crypttext_segment_hash("foo")
834 h2 = hashutil.crypttext_segment_hasher()
836 self.failUnlessEqual(h1, h2.digest())
838 h1 = hashutil.plaintext_segment_hash("foo")
839 h2 = hashutil.plaintext_segment_hasher()
841 self.failUnlessEqual(h1, h2.digest())
843 def test_timing_safe_compare(self):
844 self.failUnless(hashutil.timing_safe_compare("a", "a"))
845 self.failUnless(hashutil.timing_safe_compare("ab", "ab"))
846 self.failIf(hashutil.timing_safe_compare("a", "b"))
847 self.failIf(hashutil.timing_safe_compare("a", "aa"))
849 def _testknown(self, hashf, expected_a, *args):
851 got_a = base32.b2a(got)
852 self.failUnlessEqual(got_a, expected_a)
854 def test_known_answers(self):
855 # assert backwards compatibility
856 self._testknown(hashutil.storage_index_hash, "qb5igbhcc5esa6lwqorsy7e6am", "")
857 self._testknown(hashutil.block_hash, "msjr5bh4evuh7fa3zw7uovixfbvlnstr5b65mrerwfnvjxig2jvq", "")
858 self._testknown(hashutil.uri_extension_hash, "wthsu45q7zewac2mnivoaa4ulh5xvbzdmsbuyztq2a5fzxdrnkka", "")
859 self._testknown(hashutil.plaintext_hash, "5lz5hwz3qj3af7n6e3arblw7xzutvnd3p3fjsngqjcb7utf3x3da", "")
860 self._testknown(hashutil.crypttext_hash, "itdj6e4njtkoiavlrmxkvpreosscssklunhwtvxn6ggho4rkqwga", "")
861 self._testknown(hashutil.crypttext_segment_hash, "aovy5aa7jej6ym5ikgwyoi4pxawnoj3wtaludjz7e2nb5xijb7aa", "")
862 self._testknown(hashutil.plaintext_segment_hash, "4fdgf6qruaisyukhqcmoth4t3li6bkolbxvjy4awwcpprdtva7za", "")
863 self._testknown(hashutil.convergence_hash, "3mo6ni7xweplycin6nowynw2we", 3, 10, 100, "", "converge")
864 self._testknown(hashutil.my_renewal_secret_hash, "ujhr5k5f7ypkp67jkpx6jl4p47pyta7hu5m527cpcgvkafsefm6q", "")
865 self._testknown(hashutil.my_cancel_secret_hash, "rjwzmafe2duixvqy6h47f5wfrokdziry6zhx4smew4cj6iocsfaa", "")
866 self._testknown(hashutil.file_renewal_secret_hash, "hzshk2kf33gzbd5n3a6eszkf6q6o6kixmnag25pniusyaulqjnia", "", "si")
867 self._testknown(hashutil.file_cancel_secret_hash, "bfciwvr6w7wcavsngxzxsxxaszj72dej54n4tu2idzp6b74g255q", "", "si")
868 self._testknown(hashutil.bucket_renewal_secret_hash, "e7imrzgzaoashsncacvy3oysdd2m5yvtooo4gmj4mjlopsazmvuq", "", "\x00"*20)
869 self._testknown(hashutil.bucket_cancel_secret_hash, "dvdujeyxeirj6uux6g7xcf4lvesk632aulwkzjar7srildvtqwma", "", "\x00"*20)
870 self._testknown(hashutil.hmac, "c54ypfi6pevb3nvo6ba42jtglpkry2kbdopqsi7dgrm4r7tw5sra", "tag", "")
871 self._testknown(hashutil.mutable_rwcap_key_hash, "6rvn2iqrghii5n4jbbwwqqsnqu", "iv", "wk")
872 self._testknown(hashutil.ssk_writekey_hash, "ykpgmdbpgbb6yqz5oluw2q26ye", "")
873 self._testknown(hashutil.ssk_write_enabler_master_hash, "izbfbfkoait4dummruol3gy2bnixrrrslgye6ycmkuyujnenzpia", "")
874 self._testknown(hashutil.ssk_write_enabler_hash, "fuu2dvx7g6gqu5x22vfhtyed7p4pd47y5hgxbqzgrlyvxoev62tq", "wk", "\x00"*20)
875 self._testknown(hashutil.ssk_pubkey_fingerprint_hash, "3opzw4hhm2sgncjx224qmt5ipqgagn7h5zivnfzqycvgqgmgz35q", "")
876 self._testknown(hashutil.ssk_readkey_hash, "vugid4as6qbqgeq2xczvvcedai", "")
877 self._testknown(hashutil.ssk_readkey_data_hash, "73wsaldnvdzqaf7v4pzbr2ae5a", "iv", "rk")
878 self._testknown(hashutil.ssk_storage_index_hash, "j7icz6kigb6hxrej3tv4z7ayym", "")
881 class Abbreviate(unittest.TestCase):
883 a = abbreviate.abbreviate_time
884 self.failUnlessEqual(a(None), "unknown")
885 self.failUnlessEqual(a(0), "0 seconds")
886 self.failUnlessEqual(a(1), "1 second")
887 self.failUnlessEqual(a(2), "2 seconds")
888 self.failUnlessEqual(a(119), "119 seconds")
890 self.failUnlessEqual(a(2*MIN), "2 minutes")
891 self.failUnlessEqual(a(60*MIN), "60 minutes")
892 self.failUnlessEqual(a(179*MIN), "179 minutes")
894 self.failUnlessEqual(a(180*MIN), "3 hours")
895 self.failUnlessEqual(a(4*HOUR), "4 hours")
898 self.failUnlessEqual(a(2*DAY), "2 days")
899 self.failUnlessEqual(a(2*MONTH), "2 months")
901 self.failUnlessEqual(a(5*YEAR), "5 years")
903 def test_space(self):
904 tests_si = [(None, "unknown"),
911 (20*1000, "20.00 kB"),
912 (1024*1024, "1.05 MB"),
913 (1000*1000, "1.00 MB"),
914 (1000*1000*1000, "1.00 GB"),
915 (1000*1000*1000*1000, "1.00 TB"),
916 (1000*1000*1000*1000*1000, "1.00 PB"),
917 (1000*1000*1000*1000*1000*1000, "1.00 EB"),
918 (1234567890123456789, "1.23 EB"),
920 for (x, expected) in tests_si:
921 got = abbreviate.abbreviate_space(x, SI=True)
922 self.failUnlessEqual(got, expected)
924 tests_base1024 = [(None, "unknown"),
931 (20*1024, "20.00 kiB"),
932 (1000*1000, "976.56 kiB"),
933 (1024*1024, "1.00 MiB"),
934 (1024*1024*1024, "1.00 GiB"),
935 (1024*1024*1024*1024, "1.00 TiB"),
936 (1000*1000*1000*1000*1000, "909.49 TiB"),
937 (1024*1024*1024*1024*1024, "1.00 PiB"),
938 (1024*1024*1024*1024*1024*1024, "1.00 EiB"),
939 (1234567890123456789, "1.07 EiB"),
941 for (x, expected) in tests_base1024:
942 got = abbreviate.abbreviate_space(x, SI=False)
943 self.failUnlessEqual(got, expected)
945 self.failUnlessEqual(abbreviate.abbreviate_space_both(1234567),
946 "(1.23 MB, 1.18 MiB)")
948 def test_parse_space(self):
949 p = abbreviate.parse_abbreviated_size
950 self.failUnlessEqual(p(""), None)
951 self.failUnlessEqual(p(None), None)
952 self.failUnlessEqual(p("123"), 123)
953 self.failUnlessEqual(p("123B"), 123)
954 self.failUnlessEqual(p("2K"), 2000)
955 self.failUnlessEqual(p("2kb"), 2000)
956 self.failUnlessEqual(p("2KiB"), 2048)
957 self.failUnlessEqual(p("10MB"), 10*1000*1000)
958 self.failUnlessEqual(p("10MiB"), 10*1024*1024)
959 self.failUnlessEqual(p("5G"), 5*1000*1000*1000)
960 self.failUnlessEqual(p("4GiB"), 4*1024*1024*1024)
961 self.failUnlessEqual(p("3TB"), 3*1000*1000*1000*1000)
962 self.failUnlessEqual(p("3TiB"), 3*1024*1024*1024*1024)
963 self.failUnlessEqual(p("6PB"), 6*1000*1000*1000*1000*1000)
964 self.failUnlessEqual(p("6PiB"), 6*1024*1024*1024*1024*1024)
965 self.failUnlessEqual(p("9EB"), 9*1000*1000*1000*1000*1000*1000)
966 self.failUnlessEqual(p("9EiB"), 9*1024*1024*1024*1024*1024*1024)
968 e = self.failUnlessRaises(ValueError, p, "12 cubits")
969 self.failUnlessIn("12 cubits", str(e))
970 e = self.failUnlessRaises(ValueError, p, "1 BB")
971 self.failUnlessIn("1 BB", str(e))
972 e = self.failUnlessRaises(ValueError, p, "fhtagn")
973 self.failUnlessIn("fhtagn", str(e))
975 class Limiter(unittest.TestCase):
976 timeout = 480 # This takes longer than 240 seconds on Francois's arm box.
978 def job(self, i, foo):
979 self.calls.append( (i, foo) )
980 self.simultaneous += 1
981 self.peak_simultaneous = max(self.simultaneous, self.peak_simultaneous)
984 self.simultaneous -= 1
985 d.callback("done %d" % i)
986 reactor.callLater(1.0, _done)
989 def bad_job(self, i, foo):
990 raise ValueError("bad_job %d" % i)
992 def test_limiter(self):
994 self.simultaneous = 0
995 self.peak_simultaneous = 0
996 l = limiter.ConcurrencyLimiter()
999 dl.append(l.add(self.job, i, foo=str(i)))
1000 d = defer.DeferredList(dl, fireOnOneErrback=True)
1002 self.failUnlessEqual(self.simultaneous, 0)
1003 self.failUnless(self.peak_simultaneous <= 10)
1004 self.failUnlessEqual(len(self.calls), 20)
1006 self.failUnless( (i, str(i)) in self.calls)
1007 d.addCallback(_done)
1010 def test_errors(self):
1012 self.simultaneous = 0
1013 self.peak_simultaneous = 0
1014 l = limiter.ConcurrencyLimiter()
1017 dl.append(l.add(self.job, i, foo=str(i)))
1018 d2 = l.add(self.bad_job, 21, "21")
1019 d = defer.DeferredList(dl, fireOnOneErrback=True)
1020 def _most_done(res):
1022 for (success, result) in res:
1023 self.failUnlessEqual(success, True)
1024 results.append(result)
1026 expected_results = ["done %d" % i for i in range(20)]
1027 expected_results.sort()
1028 self.failUnlessEqual(results, expected_results)
1029 self.failUnless(self.peak_simultaneous <= 10)
1030 self.failUnlessEqual(len(self.calls), 20)
1032 self.failUnless( (i, str(i)) in self.calls)
1034 self.fail("should have failed, not got %s" % (res,))
1037 self.failUnless("bad_job 21" in str(f))
1038 d2.addCallbacks(_good, _err)
1040 d.addCallback(_most_done)
1042 self.failUnlessEqual(self.simultaneous, 0)
1043 self.failUnless(self.peak_simultaneous <= 10)
1044 self.failUnlessEqual(len(self.calls), 20)
1046 self.failUnless( (i, str(i)) in self.calls)
1047 d.addCallback(_all_done)
1050 class TimeFormat(unittest.TestCase):
1051 def test_epoch(self):
1052 return self._help_test_epoch()
1054 def test_epoch_in_London(self):
1055 # Europe/London is a particularly troublesome timezone. Nowadays, its
1056 # offset from GMT is 0. But in 1970, its offset from GMT was 1.
1057 # (Apparently in 1970 Britain had redefined standard time to be GMT+1
1058 # and stayed in standard time all year round, whereas today
1059 # Europe/London standard time is GMT and Europe/London Daylight
1060 # Savings Time is GMT+1.) The current implementation of
1061 # time_format.iso_utc_time_to_localseconds() breaks if the timezone is
1062 # Europe/London. (As soon as this unit test is done then I'll change
1063 # that implementation to something that works even in this case...)
1064 origtz = os.environ.get('TZ')
1065 os.environ['TZ'] = "Europe/London"
1066 if hasattr(time, 'tzset'):
1069 return self._help_test_epoch()
1072 del os.environ['TZ']
1074 os.environ['TZ'] = origtz
1075 if hasattr(time, 'tzset'):
1078 def _help_test_epoch(self):
1079 origtzname = time.tzname
1080 s = time_format.iso_utc_time_to_seconds("1970-01-01T00:00:01")
1081 self.failUnlessEqual(s, 1.0)
1082 s = time_format.iso_utc_time_to_seconds("1970-01-01_00:00:01")
1083 self.failUnlessEqual(s, 1.0)
1084 s = time_format.iso_utc_time_to_seconds("1970-01-01 00:00:01")
1085 self.failUnlessEqual(s, 1.0)
1087 self.failUnlessEqual(time_format.iso_utc(1.0), "1970-01-01_00:00:01")
1088 self.failUnlessEqual(time_format.iso_utc(1.0, sep=" "),
1089 "1970-01-01 00:00:01")
1092 isostr = time_format.iso_utc(now)
1093 timestamp = time_format.iso_utc_time_to_seconds(isostr)
1094 self.failUnlessEqual(int(timestamp), int(now))
1098 self.failUnlessEqual(time_format.iso_utc(t=my_time),
1099 "1970-01-01_00:00:01")
1100 e = self.failUnlessRaises(ValueError,
1101 time_format.iso_utc_time_to_seconds,
1102 "invalid timestring")
1103 self.failUnless("not a complete ISO8601 timestamp" in str(e))
1104 s = time_format.iso_utc_time_to_seconds("1970-01-01_00:00:01.500")
1105 self.failUnlessEqual(s, 1.5)
1107 # Look for daylight-savings-related errors.
1108 thatmomentinmarch = time_format.iso_utc_time_to_seconds("2009-03-20 21:49:02.226536")
1109 self.failUnlessEqual(thatmomentinmarch, 1237585742.226536)
1110 self.failUnlessEqual(origtzname, time.tzname)
1112 def test_iso_utc(self):
1113 when = 1266760143.7841301
1114 out = time_format.iso_utc_date(when)
1115 self.failUnlessEqual(out, "2010-02-21")
1116 out = time_format.iso_utc_date(t=lambda: when)
1117 self.failUnlessEqual(out, "2010-02-21")
1118 out = time_format.iso_utc(when)
1119 self.failUnlessEqual(out, "2010-02-21_13:49:03.784130")
1120 out = time_format.iso_utc(when, sep="-")
1121 self.failUnlessEqual(out, "2010-02-21-13:49:03.784130")
1123 def test_parse_duration(self):
1124 p = time_format.parse_duration
1126 self.failUnlessEqual(p("1 day"), DAY)
1127 self.failUnlessEqual(p("2 days"), 2*DAY)
1128 self.failUnlessEqual(p("3 months"), 3*31*DAY)
1129 self.failUnlessEqual(p("4 mo"), 4*31*DAY)
1130 self.failUnlessEqual(p("5 years"), 5*365*DAY)
1131 e = self.failUnlessRaises(ValueError, p, "123")
1132 self.failUnlessIn("no unit (like day, month, or year) in '123'",
1135 def test_parse_date(self):
1136 self.failUnlessEqual(time_format.parse_date("2010-02-21"), 1266710400)
1138 class CacheDir(unittest.TestCase):
1139 def test_basic(self):
1140 basedir = "test_util/CacheDir/test_basic"
1142 def _failIfExists(name):
1143 absfn = os.path.join(basedir, name)
1144 self.failIf(os.path.exists(absfn),
1145 "%s exists but it shouldn't" % absfn)
1147 def _failUnlessExists(name):
1148 absfn = os.path.join(basedir, name)
1149 self.failUnless(os.path.exists(absfn),
1150 "%s doesn't exist but it should" % absfn)
1152 cdm = cachedir.CacheDirectoryManager(basedir)
1153 a = cdm.get_file("a")
1154 b = cdm.get_file("b")
1155 c = cdm.get_file("c")
1156 f = open(a.get_filename(), "wb"); f.write("hi"); f.close(); del f
1157 f = open(b.get_filename(), "wb"); f.write("hi"); f.close(); del f
1158 f = open(c.get_filename(), "wb"); f.write("hi"); f.close(); del f
1160 _failUnlessExists("a")
1161 _failUnlessExists("b")
1162 _failUnlessExists("c")
1166 _failUnlessExists("a")
1167 _failUnlessExists("b")
1168 _failUnlessExists("c")
1171 # this file won't be deleted yet, because it isn't old enough
1173 _failUnlessExists("a")
1174 _failUnlessExists("b")
1175 _failUnlessExists("c")
1177 # we change the definition of "old" to make everything old
1182 _failUnlessExists("b")
1183 _failUnlessExists("c")
1191 _failUnlessExists("b")
1192 _failUnlessExists("c")
1194 b2 = cdm.get_file("b")
1198 _failUnlessExists("b")
1199 _failUnlessExists("c")
1204 def __init__(self, x):
1209 return "<%s %s>" % (self.__class__.__name__, self.x,)
1212 def __le__(self, other):
1213 return self.x <= other
1214 def __lt__(self, other):
1215 return self.x < other
1216 def __ge__(self, other):
1217 return self.x >= other
1218 def __gt__(self, other):
1219 return self.x > other
1220 def __ne__(self, other):
1221 return self.x != other
1222 def __eq__(self, other):
1223 return self.x == other
1225 class DictUtil(unittest.TestCase):
1226 def _help_test_empty_dict(self, klass):
1230 self.failUnless(d1 == d2, "d1: %r, d2: %r" % (d1, d2,))
1231 self.failUnless(len(d1) == 0)
1232 self.failUnless(len(d2) == 0)
1234 def _help_test_nonempty_dict(self, klass):
1235 d1 = klass({'a': 1, 'b': "eggs", 3: "spam",})
1236 d2 = klass({'a': 1, 'b': "eggs", 3: "spam",})
1238 self.failUnless(d1 == d2)
1239 self.failUnless(len(d1) == 3, "%s, %s" % (len(d1), d1,))
1240 self.failUnless(len(d2) == 3)
1242 def _help_test_eq_but_notis(self, klass):
1243 d = klass({'a': 3, 'b': EqButNotIs(3), 'c': 3})
1248 d['b'] = EqButNotIs(3)
1253 d['b'] = EqButNotIs(3)
1259 d['a'] = EqButNotIs(3)
1264 fake3 = EqButNotIs(3)
1265 fake7 = EqButNotIs(7)
1269 self.failUnless(filter(lambda x: x is 8, d.itervalues()))
1270 self.failUnless(filter(lambda x: x is fake7, d.itervalues()))
1271 # The real 7 should have been ejected by the d[3] = 8.
1272 self.failUnless(not filter(lambda x: x is 7, d.itervalues()))
1273 self.failUnless(filter(lambda x: x is fake3, d.iterkeys()))
1274 self.failUnless(filter(lambda x: x is 3, d.iterkeys()))
1279 fake3 = EqButNotIs(3)
1280 fake7 = EqButNotIs(7)
1283 self.failUnless(filter(lambda x: x is 8, d.itervalues()))
1284 self.failUnless(filter(lambda x: x is fake7, d.itervalues()))
1285 # The real 7 should have been ejected by the d[3] = 8.
1286 self.failUnless(not filter(lambda x: x is 7, d.itervalues()))
1287 self.failUnless(filter(lambda x: x is fake3, d.iterkeys()))
1288 self.failUnless(filter(lambda x: x is 3, d.iterkeys()))
1292 self._help_test_eq_but_notis(dictutil.UtilDict)
1293 self._help_test_eq_but_notis(dictutil.NumDict)
1294 self._help_test_eq_but_notis(dictutil.ValueOrderedDict)
1295 self._help_test_nonempty_dict(dictutil.UtilDict)
1296 self._help_test_nonempty_dict(dictutil.NumDict)
1297 self._help_test_nonempty_dict(dictutil.ValueOrderedDict)
1298 self._help_test_eq_but_notis(dictutil.UtilDict)
1299 self._help_test_eq_but_notis(dictutil.NumDict)
1300 self._help_test_eq_but_notis(dictutil.ValueOrderedDict)
1302 def test_dict_of_sets(self):
1303 ds = dictutil.DictOfSets()
1308 self.failUnlessEqual(ds[1], set(["a"]))
1309 self.failUnlessEqual(ds[2], set(["b", "c"]))
1310 ds.discard(3, "d") # should not raise an exception
1312 self.failUnlessEqual(ds[2], set(["c"]))
1314 self.failIf(2 in ds)
1317 ds2 = dictutil.DictOfSets()
1322 self.failUnlessEqual(ds[1], set(["a"]))
1323 self.failUnlessEqual(ds[3], set(["f", "g"]))
1324 self.failUnlessEqual(ds[4], set(["h"]))
1326 def test_move(self):
1327 d1 = {1: "a", 2: "b"}
1328 d2 = {2: "c", 3: "d"}
1329 dictutil.move(1, d1, d2)
1330 self.failUnlessEqual(d1, {2: "b"})
1331 self.failUnlessEqual(d2, {1: "a", 2: "c", 3: "d"})
1333 d1 = {1: "a", 2: "b"}
1334 d2 = {2: "c", 3: "d"}
1335 dictutil.move(2, d1, d2)
1336 self.failUnlessEqual(d1, {1: "a"})
1337 self.failUnlessEqual(d2, {2: "b", 3: "d"})
1339 d1 = {1: "a", 2: "b"}
1340 d2 = {2: "c", 3: "d"}
1341 self.failUnlessRaises(KeyError, dictutil.move, 5, d1, d2, strict=True)
1343 def test_subtract(self):
1344 d1 = {1: "a", 2: "b"}
1345 d2 = {2: "c", 3: "d"}
1346 d3 = dictutil.subtract(d1, d2)
1347 self.failUnlessEqual(d3, {1: "a"})
1349 d1 = {1: "a", 2: "b"}
1351 d3 = dictutil.subtract(d1, d2)
1352 self.failUnlessEqual(d3, {1: "a"})
1354 def test_utildict(self):
1355 d = dictutil.UtilDict({1: "a", 2: "b"})
1358 self.failUnlessEqual(d, {2: "b"})
1361 self.failUnlessRaises(TypeError, eq, d, "not a dict")
1363 d = dictutil.UtilDict({1: "b", 2: "a"})
1364 self.failUnlessEqual(d.items_sorted_by_value(),
1365 [(2, "a"), (1, "b")])
1366 self.failUnlessEqual(d.items_sorted_by_key(),
1367 [(1, "b"), (2, "a")])
1368 self.failUnlessEqual(repr(d), "{1: 'b', 2: 'a'}")
1369 self.failUnless(1 in d)
1371 d2 = dictutil.UtilDict({3: "c", 4: "d"})
1372 self.failUnless(d != d2)
1373 self.failUnless(d2 > d)
1374 self.failUnless(d2 >= d)
1375 self.failUnless(d <= d2)
1376 self.failUnless(d < d2)
1377 self.failUnlessEqual(d[1], "b")
1378 self.failUnlessEqual(sorted(list([k for k in d])), [1,2])
1381 self.failUnlessEqual(d, d3)
1382 self.failUnless(isinstance(d3, dictutil.UtilDict))
1384 d4 = d.fromkeys([3,4], "e")
1385 self.failUnlessEqual(d4, {3: "e", 4: "e"})
1387 self.failUnlessEqual(d.get(1), "b")
1388 self.failUnlessEqual(d.get(3), None)
1389 self.failUnlessEqual(d.get(3, "default"), "default")
1390 self.failUnlessEqual(sorted(list(d.items())),
1391 [(1, "b"), (2, "a")])
1392 self.failUnlessEqual(sorted(list(d.iteritems())),
1393 [(1, "b"), (2, "a")])
1394 self.failUnlessEqual(sorted(d.keys()), [1, 2])
1395 self.failUnlessEqual(sorted(d.values()), ["a", "b"])
1396 x = d.setdefault(1, "new")
1397 self.failUnlessEqual(x, "b")
1398 self.failUnlessEqual(d[1], "b")
1399 x = d.setdefault(3, "new")
1400 self.failUnlessEqual(x, "new")
1401 self.failUnlessEqual(d[3], "new")
1405 self.failUnless(x in [(1, "b"), (2, "a")])
1407 self.failUnless(x in [(1, "b"), (2, "a")])
1408 self.failUnlessRaises(KeyError, d.popitem)
1410 def test_numdict(self):
1411 d = dictutil.NumDict({"a": 1, "b": 2})
1413 d.add_num("a", 10, 5)
1414 d.add_num("c", 20, 5)
1416 self.failUnlessEqual(d, {"a": 11, "b": 2, "c": 25, "d": 30})
1418 d.subtract_num("a", 10)
1419 d.subtract_num("e", 10)
1420 d.subtract_num("f", 10, 15)
1421 self.failUnlessEqual(d, {"a": 1, "b": 2, "c": 25, "d": 30,
1424 self.failUnlessEqual(d.sum(), sum([1, 2, 25, 30, -10, 5]))
1426 d = dictutil.NumDict()
1430 self.failUnlessEqual(d, {"a": 2, "b": 6})
1434 self.failUnlessEqual(d, {"a": 1, "b": 6, "c": -1, "d": 4})
1435 self.failUnlessEqual(d.items_sorted_by_key(),
1436 [("a", 1), ("b", 6), ("c", -1), ("d", 4)])
1437 self.failUnlessEqual(d.items_sorted_by_value(),
1438 [("c", -1), ("a", 1), ("d", 4), ("b", 6)])
1439 self.failUnlessEqual(d.item_with_largest_value(), ("b", 6))
1441 d = dictutil.NumDict({"a": 1, "b": 2})
1442 self.failUnlessEqual(repr(d), "{'a': 1, 'b': 2}")
1443 self.failUnless("a" in d)
1445 d2 = dictutil.NumDict({"c": 3, "d": 4})
1446 self.failUnless(d != d2)
1447 self.failUnless(d2 > d)
1448 self.failUnless(d2 >= d)
1449 self.failUnless(d <= d2)
1450 self.failUnless(d < d2)
1451 self.failUnlessEqual(d["a"], 1)
1452 self.failUnlessEqual(sorted(list([k for k in d])), ["a","b"])
1455 self.failUnlessRaises(TypeError, eq, d, "not a dict")
1458 self.failUnlessEqual(d, d3)
1459 self.failUnless(isinstance(d3, dictutil.NumDict))
1461 d4 = d.fromkeys(["a","b"], 5)
1462 self.failUnlessEqual(d4, {"a": 5, "b": 5})
1464 self.failUnlessEqual(d.get("a"), 1)
1465 self.failUnlessEqual(d.get("c"), 0)
1466 self.failUnlessEqual(d.get("c", 5), 5)
1467 self.failUnlessEqual(sorted(list(d.items())),
1468 [("a", 1), ("b", 2)])
1469 self.failUnlessEqual(sorted(list(d.iteritems())),
1470 [("a", 1), ("b", 2)])
1471 self.failUnlessEqual(sorted(d.keys()), ["a", "b"])
1472 self.failUnlessEqual(sorted(d.values()), [1, 2])
1473 self.failUnless(d.has_key("a"))
1474 self.failIf(d.has_key("c"))
1476 x = d.setdefault("c", 3)
1477 self.failUnlessEqual(x, 3)
1478 self.failUnlessEqual(d["c"], 3)
1479 x = d.setdefault("c", 5)
1480 self.failUnlessEqual(x, 3)
1481 self.failUnlessEqual(d["c"], 3)
1485 self.failUnless(x in [("a", 1), ("b", 2)])
1487 self.failUnless(x in [("a", 1), ("b", 2)])
1488 self.failUnlessRaises(KeyError, d.popitem)
1491 d.update({"c": 4, "d": 5})
1492 self.failUnlessEqual(d, {"c": 4, "d": 5})
1494 def test_del_if_present(self):
1495 d = {1: "a", 2: "b"}
1496 dictutil.del_if_present(d, 1)
1497 dictutil.del_if_present(d, 3)
1498 self.failUnlessEqual(d, {2: "b"})
1500 def test_valueordereddict(self):
1501 d = dictutil.ValueOrderedDict()
1506 self.failUnlessEqual(d, {"a": 3, "b": 2, "c": 1})
1507 self.failUnlessEqual(d.items(), [("c", 1), ("b", 2), ("a", 3)])
1508 self.failUnlessEqual(d.values(), [1, 2, 3])
1509 self.failUnlessEqual(d.keys(), ["c", "b", "a"])
1510 self.failUnlessEqual(repr(d), "<ValueOrderedDict {c: 1, b: 2, a: 3}>")
1513 self.failIf(d == {"a": 4})
1514 self.failUnless(d != {"a": 4})
1516 x = d.setdefault("d", 0)
1517 self.failUnlessEqual(x, 0)
1518 self.failUnlessEqual(d["d"], 0)
1519 x = d.setdefault("d", -1)
1520 self.failUnlessEqual(x, 0)
1521 self.failUnlessEqual(d["d"], 0)
1523 x = d.remove("e", "default", False)
1524 self.failUnlessEqual(x, "default")
1525 self.failUnlessRaises(KeyError, d.remove, "e", "default", True)
1526 x = d.remove("d", 5)
1527 self.failUnlessEqual(x, 0)
1529 x = d.__getitem__("c")
1530 self.failUnlessEqual(x, 1)
1531 x = d.__getitem__("e", "default", False)
1532 self.failUnlessEqual(x, "default")
1533 self.failUnlessRaises(KeyError, d.__getitem__, "e", "default", True)
1535 self.failUnlessEqual(d.popitem(), ("c", 1))
1536 self.failUnlessEqual(d.popitem(), ("b", 2))
1537 self.failUnlessEqual(d.popitem(), ("a", 3))
1538 self.failUnlessRaises(KeyError, d.popitem)
1540 d = dictutil.ValueOrderedDict({"a": 3, "b": 2, "c": 1})
1541 x = d.pop("d", "default", False)
1542 self.failUnlessEqual(x, "default")
1543 self.failUnlessRaises(KeyError, d.pop, "d", "default", True)
1545 self.failUnlessEqual(x, 2)
1546 self.failUnlessEqual(d.items(), [("c", 1), ("a", 3)])
1548 d = dictutil.ValueOrderedDict({"a": 3, "b": 2, "c": 1})
1549 x = d.pop_from_list(1) # pop the second item, b/2
1550 self.failUnlessEqual(x, "b")
1551 self.failUnlessEqual(d.items(), [("c", 1), ("a", 3)])
1553 def test_auxdict(self):
1554 d = dictutil.AuxValueDict()
1555 # we put the serialized form in the auxdata
1556 d.set_with_aux("key", ("filecap", "metadata"), "serialized")
1558 self.failUnlessEqual(d.keys(), ["key"])
1559 self.failUnlessEqual(d["key"], ("filecap", "metadata"))
1560 self.failUnlessEqual(d.get_aux("key"), "serialized")
1561 def _get_missing(key):
1563 self.failUnlessRaises(KeyError, _get_missing, "nonkey")
1564 self.failUnlessEqual(d.get("nonkey"), None)
1565 self.failUnlessEqual(d.get("nonkey", "nonvalue"), "nonvalue")
1566 self.failUnlessEqual(d.get_aux("nonkey"), None)
1567 self.failUnlessEqual(d.get_aux("nonkey", "nonvalue"), "nonvalue")
1569 d["key"] = ("filecap2", "metadata2")
1570 self.failUnlessEqual(d["key"], ("filecap2", "metadata2"))
1571 self.failUnlessEqual(d.get_aux("key"), None)
1573 d.set_with_aux("key2", "value2", "aux2")
1574 self.failUnlessEqual(sorted(d.keys()), ["key", "key2"])
1576 self.failUnlessEqual(d.keys(), ["key"])
1577 self.failIf("key2" in d)
1578 self.failUnlessRaises(KeyError, _get_missing, "key2")
1579 self.failUnlessEqual(d.get("key2"), None)
1580 self.failUnlessEqual(d.get_aux("key2"), None)
1581 d["key2"] = "newvalue2"
1582 self.failUnlessEqual(d.get("key2"), "newvalue2")
1583 self.failUnlessEqual(d.get_aux("key2"), None)
1585 d = dictutil.AuxValueDict({1:2,3:4})
1586 self.failUnlessEqual(sorted(d.keys()), [1,3])
1587 self.failUnlessEqual(d[1], 2)
1588 self.failUnlessEqual(d.get_aux(1), None)
1590 d = dictutil.AuxValueDict([ (1,2), (3,4) ])
1591 self.failUnlessEqual(sorted(d.keys()), [1,3])
1592 self.failUnlessEqual(d[1], 2)
1593 self.failUnlessEqual(d.get_aux(1), None)
1595 d = dictutil.AuxValueDict(one=1, two=2)
1596 self.failUnlessEqual(sorted(d.keys()), ["one","two"])
1597 self.failUnlessEqual(d["one"], 1)
1598 self.failUnlessEqual(d.get_aux("one"), None)
1600 class Pipeline(unittest.TestCase):
1601 def pause(self, *args, **kwargs):
1602 d = defer.Deferred()
1603 self.calls.append( (d, args, kwargs) )
1606 def failUnlessCallsAre(self, expected):
1609 self.failUnlessEqual(len(self.calls), len(expected), self.calls)
1610 for i,c in enumerate(self.calls):
1611 self.failUnlessEqual(c[1:], expected[i], str(i))
1613 def test_basic(self):
1616 p = pipeline.Pipeline(100)
1618 d = p.flush() # fires immediately
1619 d.addCallbacks(finished.append, log.err)
1620 self.failUnlessEqual(len(finished), 1)
1623 d = p.add(10, self.pause, "one")
1624 # the call should start right away, and our return Deferred should
1626 d.addCallbacks(finished.append, log.err)
1627 self.failUnlessEqual(len(finished), 1)
1628 self.failUnlessEqual(finished[0], None)
1629 self.failUnlessCallsAre([ ( ("one",) , {} ) ])
1630 self.failUnlessEqual(p.gauge, 10)
1635 d = p.add(20, self.pause, "two", kw=2)
1636 # pipeline: [one, two]
1638 # the call and the Deferred should fire right away
1639 d.addCallbacks(finished.append, log.err)
1640 self.failUnlessEqual(len(finished), 1)
1641 self.failUnlessEqual(finished[0], None)
1642 self.failUnlessCallsAre([ ( ("one",) , {} ),
1643 ( ("two",) , {"kw": 2} ),
1645 self.failUnlessEqual(p.gauge, 30)
1647 self.calls[0][0].callback("one-result")
1649 self.failUnlessEqual(p.gauge, 20)
1652 d = p.add(90, self.pause, "three", "posarg1")
1653 # pipeline: [two, three]
1656 fd.addCallbacks(flushed.append, log.err)
1657 self.failUnlessEqual(flushed, [])
1659 # the call will be made right away, but the return Deferred will not,
1660 # because the pipeline is now full.
1661 d.addCallbacks(finished.append, log.err)
1662 self.failUnlessEqual(len(finished), 0)
1663 self.failUnlessCallsAre([ ( ("one",) , {} ),
1664 ( ("two",) , {"kw": 2} ),
1665 ( ("three", "posarg1"), {} ),
1667 self.failUnlessEqual(p.gauge, 110)
1669 self.failUnlessRaises(pipeline.SingleFileError, p.add, 10, self.pause)
1671 # retiring either call will unblock the pipeline, causing the #3
1673 self.calls[2][0].callback("three-result")
1676 self.failUnlessEqual(len(finished), 1)
1677 self.failUnlessEqual(finished[0], None)
1678 self.failUnlessEqual(flushed, [])
1680 # retiring call#2 will finally allow the flush() Deferred to fire
1681 self.calls[1][0].callback("two-result")
1682 self.failUnlessEqual(len(flushed), 1)
1684 def test_errors(self):
1686 p = pipeline.Pipeline(100)
1688 d1 = p.add(200, self.pause, "one")
1692 d1.addBoth(finished.append)
1693 self.failUnlessEqual(finished, [])
1696 d2.addBoth(flushed.append)
1697 self.failUnlessEqual(flushed, [])
1699 self.calls[0][0].errback(ValueError("oops"))
1701 self.failUnlessEqual(len(finished), 1)
1703 self.failUnless(isinstance(f, Failure))
1704 self.failUnless(f.check(pipeline.PipelineError))
1705 self.failUnlessIn("PipelineError", str(f.value))
1706 self.failUnlessIn("ValueError", str(f.value))
1708 self.failUnless("ValueError" in r, r)
1710 self.failUnless(f2.check(ValueError))
1712 self.failUnlessEqual(len(flushed), 1)
1714 self.failUnless(isinstance(f, Failure))
1715 self.failUnless(f.check(pipeline.PipelineError))
1717 self.failUnless(f2.check(ValueError))
1719 # now that the pipeline is in the failed state, any new calls will
1722 d3 = p.add(20, self.pause, "two")
1725 d3.addBoth(finished.append)
1726 self.failUnlessEqual(len(finished), 1)
1728 self.failUnless(isinstance(f, Failure))
1729 self.failUnless(f.check(pipeline.PipelineError))
1731 self.failUnless("ValueError" in r, r)
1733 self.failUnless(f2.check(ValueError))
1737 d4.addBoth(flushed.append)
1738 self.failUnlessEqual(len(flushed), 1)
1740 self.failUnless(isinstance(f, Failure))
1741 self.failUnless(f.check(pipeline.PipelineError))
1743 self.failUnless(f2.check(ValueError))
1745 def test_errors2(self):
1747 p = pipeline.Pipeline(100)
1749 d1 = p.add(10, self.pause, "one")
1750 d2 = p.add(20, self.pause, "two")
1751 d3 = p.add(30, self.pause, "three")
1754 # one call fails, then the second one succeeds: make sure
1755 # ExpandableDeferredList tolerates the second one
1758 d4.addBoth(flushed.append)
1759 self.failUnlessEqual(flushed, [])
1761 self.calls[0][0].errback(ValueError("oops"))
1762 self.failUnlessEqual(len(flushed), 1)
1764 self.failUnless(isinstance(f, Failure))
1765 self.failUnless(f.check(pipeline.PipelineError))
1767 self.failUnless(f2.check(ValueError))
1769 self.calls[1][0].callback("two-result")
1770 self.calls[2][0].errback(ValueError("three-error"))
1774 class SampleError(Exception):
1777 class Log(unittest.TestCase):
1779 if not hasattr(self, "flushLoggedErrors"):
1780 # without flushLoggedErrors, we can't get rid of the
1781 # twisted.log.err that tahoe_log records, so we can't keep this
1782 # test from [ERROR]ing
1783 raise unittest.SkipTest("needs flushLoggedErrors from Twisted-2.5.0")
1785 raise SampleError("simple sample")
1788 tahoe_log.err(format="intentional sample error",
1789 failure=f, level=tahoe_log.OPERATIONAL, umid="wO9UoQ")
1790 self.flushLoggedErrors(SampleError)
1794 # this is a simple+inefficient form of util.spans.Spans . We compare the
1795 # behavior of this reference model against the real (efficient) form.
1797 def __init__(self, _span_or_start=None, length=None):
1799 if length is not None:
1800 for i in range(_span_or_start, _span_or_start+length):
1802 elif _span_or_start:
1803 for (start,length) in _span_or_start:
1804 self.add(start, length)
1806 def add(self, start, length):
1807 for i in range(start, start+length):
1811 def remove(self, start, length):
1812 for i in range(start, start+length):
1813 self._have.discard(i)
1817 return sorted(self._have)
1820 items = sorted(self._have)
1824 if prevstart is None:
1825 prevstart = prevend = i
1830 yield (prevstart, prevend-prevstart+1)
1831 prevstart = prevend = i
1832 if prevstart is not None:
1833 yield (prevstart, prevend-prevstart+1)
1835 def __nonzero__(self): # this gets us bool()
1839 return len(self._have)
1841 def __add__(self, other):
1842 s = self.__class__(self)
1843 for (start, length) in other:
1844 s.add(start, length)
1847 def __sub__(self, other):
1848 s = self.__class__(self)
1849 for (start, length) in other:
1850 s.remove(start, length)
1853 def __iadd__(self, other):
1854 for (start, length) in other:
1855 self.add(start, length)
1858 def __isub__(self, other):
1859 for (start, length) in other:
1860 self.remove(start, length)
1863 def __and__(self, other):
1864 s = self.__class__()
1865 for i in other.each():
1870 def __contains__(self, (start,length)):
1871 for i in range(start, start+length):
1872 if i not in self._have:
1876 class ByteSpans(unittest.TestCase):
1877 def test_basic(self):
1879 self.failUnlessEqual(list(s), [])
1881 self.failIf((0,1) in s)
1882 self.failUnlessEqual(s.len(), 0)
1884 s1 = Spans(3, 4) # 3,4,5,6
1887 s1 = Spans(3L, 4L) # 3,4,5,6
1893 s2.add(10,2) # 10,11
1895 self.failUnless((10,1) in s2)
1896 self.failIf((10,1) in s1)
1897 self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11])
1898 self.failUnlessEqual(s2.len(), 6)
1900 s2.add(15,2).add(20,2)
1901 self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11,15,16,20,21])
1902 self.failUnlessEqual(s2.len(), 10)
1904 s2.remove(4,3).remove(15,1)
1905 self.failUnlessEqual(list(s2.each()), [3,10,11,16,20,21])
1906 self.failUnlessEqual(s2.len(), 6)
1908 s1 = SimpleSpans(3, 4) # 3 4 5 6
1909 s2 = SimpleSpans(5, 4) # 5 6 7 8
1911 self.failUnlessEqual(list(i.each()), [5, 6])
1913 def _check1(self, s):
1914 self.failUnlessEqual(list(s), [(3,4)])
1916 self.failUnlessEqual(s.len(), 4)
1917 self.failIf((0,1) in s)
1918 self.failUnless((3,4) in s)
1919 self.failUnless((3,1) in s)
1920 self.failUnless((5,2) in s)
1921 self.failUnless((6,1) in s)
1922 self.failIf((6,2) in s)
1923 self.failIf((7,1) in s)
1924 self.failUnlessEqual(list(s.each()), [3,4,5,6])
1926 def test_large(self):
1927 s = Spans(4, 2**65) # don't do this with a SimpleSpans
1928 self.failUnlessEqual(list(s), [(4, 2**65)])
1930 self.failUnlessEqual(s.len(), 2**65)
1931 self.failIf((0,1) in s)
1932 self.failUnless((4,2) in s)
1933 self.failUnless((2**65,2) in s)
1935 def test_math(self):
1936 s1 = Spans(0, 10) # 0,1,2,3,4,5,6,7,8,9
1937 s2 = Spans(5, 3) # 5,6,7
1938 s3 = Spans(8, 4) # 8,9,10,11
1941 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9])
1943 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7])
1945 self.failUnlessEqual(list(s.each()), [5,6,7])
1947 self.failUnlessEqual(list(s.each()), [5,6,7])
1949 self.failUnlessEqual(list(s.each()), [5,6,7])
1951 self.failUnlessEqual(list(s.each()), [8,9])
1953 self.failUnlessEqual(list(s.each()), [8,9])
1955 self.failUnlessEqual(list(s.each()), [])
1957 self.failUnlessEqual(list(s.each()), [])
1959 self.failUnlessEqual(list(s.each()), [])
1961 self.failUnlessEqual(list(s.each()), [])
1964 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9])
1966 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11])
1968 self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11])
1972 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9])
1975 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7])
1978 self.failUnlessEqual(list(s.each()), [5,6,7])
1982 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9])
1985 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11])
1988 self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11])
1990 def test_random(self):
1991 # attempt to increase coverage of corner cases by comparing behavior
1992 # of a simple-but-slow model implementation against the
1993 # complex-but-fast actual implementation, in a large number of random
1997 s1 = S1(); s2 = S2()
1999 def _create(subseed):
2000 ns1 = S1(); ns2 = S2()
2002 what = _hash(subseed+str(i)).hexdigest()
2003 start = int(what[2:4], 16)
2004 length = max(1,int(what[5:6], 16))
2005 ns1.add(start, length); ns2.add(start, length)
2009 for i in range(1000):
2010 what = _hash(seed+str(i)).hexdigest()
2013 start = int(what[2:4], 16)
2014 length = max(1,int(what[5:6], 16))
2017 if subop in "01234":
2018 s1 = S1(); s2 = S2()
2019 elif subop in "5678":
2020 s1 = S1(start, length); s2 = S2(start, length)
2022 s1 = S1(s1); s2 = S2(s2)
2023 #print "s2 = %s" % s2.dump()
2025 #print "s2.add(%d,%d)" % (start, length)
2026 s1.add(start, length); s2.add(start, length)
2028 #print "s2.remove(%d,%d)" % (start, length)
2029 s1.remove(start, length); s2.remove(start, length)
2031 ns1, ns2 = _create(what[7:11])
2032 #print "s2 + %s" % ns2.dump()
2033 s1 = s1 + ns1; s2 = s2 + ns2
2035 ns1, ns2 = _create(what[7:11])
2036 #print "%s - %s" % (s2.dump(), ns2.dump())
2037 s1 = s1 - ns1; s2 = s2 - ns2
2039 ns1, ns2 = _create(what[7:11])
2040 #print "s2 += %s" % ns2.dump()
2041 s1 += ns1; s2 += ns2
2043 ns1, ns2 = _create(what[7:11])
2044 #print "%s -= %s" % (s2.dump(), ns2.dump())
2045 s1 -= ns1; s2 -= ns2
2047 ns1, ns2 = _create(what[7:11])
2048 #print "%s &= %s" % (s2.dump(), ns2.dump())
2049 s1 = s1 & ns1; s2 = s2 & ns2
2050 #print "s2 now %s" % s2.dump()
2051 self.failUnlessEqual(list(s1.each()), list(s2.each()))
2052 self.failUnlessEqual(s1.len(), s2.len())
2053 self.failUnlessEqual(bool(s1), bool(s2))
2054 self.failUnlessEqual(list(s1), list(s2))
2056 what = _hash(what[12:14]+str(j)).hexdigest()
2057 start = int(what[2:4], 16)
2058 length = max(1, int(what[5:6], 16))
2059 span = (start, length)
2060 self.failUnlessEqual(bool(span in s1), bool(span in s2))
2066 # s.add(start,length) : returns s
2067 # s.remove(start,length)
2068 # s.each() -> list of byte offsets, mostly for testing
2069 # list(s) -> list of (start,length) tuples, one per span
2070 # (start,length) in s -> True if (start..start+length-1) are all members
2071 # NOT equivalent to x in list(s)
2072 # s.len() -> number of bytes, for testing, bool(), and accounting/limiting
2073 # bool(s) (__nonzeron__)
2074 # s = s1+s2, s1-s2, +=s1, -=s1
2076 def test_overlap(self):
2081 self._test_overlap(a,b,c,d)
2083 def _test_overlap(self, a, b, c, d):
2084 s1 = set(range(a,a+b))
2085 s2 = set(range(c,c+d))
2087 #self._show_overlap(s1, "1")
2088 #self._show_overlap(s2, "2")
2089 o = overlap(a,b,c,d)
2090 expected = s1.intersection(s2)
2092 self.failUnlessEqual(o, None)
2095 so = set(range(start,start+length))
2096 #self._show(so, "o")
2097 self.failUnlessEqual(so, expected)
2099 def _show_overlap(self, s, c):
2103 for i in range(max(s)):
2110 def extend(s, start, length, fill):
2111 if len(s) >= start+length:
2113 assert len(fill) == 1
2114 return s + fill*(start+length-len(s))
2116 def replace(s, start, data):
2117 assert len(s) >= start+len(data)
2118 return s[:start] + data + s[start+len(data):]
2120 class SimpleDataSpans:
2121 def __init__(self, other=None):
2122 self.missing = "" # "1" where missing, "0" where found
2125 for (start, data) in other.get_chunks():
2126 self.add(start, data)
2128 def __nonzero__(self): # this gets us bool()
2131 return len(self.missing.replace("1", ""))
2133 return [i for (i,c) in enumerate(self.missing) if c == "0"]
2134 def _have(self, start, length):
2135 m = self.missing[start:start+length]
2136 if not m or len(m)<length or int(m):
2139 def get_chunks(self):
2140 for i in self._dump():
2141 yield (i, self.data[i])
2142 def get_spans(self):
2143 return SimpleSpans([(start,len(data))
2144 for (start,data) in self.get_chunks()])
2145 def get(self, start, length):
2146 if self._have(start, length):
2147 return self.data[start:start+length]
2149 def pop(self, start, length):
2150 data = self.get(start, length)
2152 self.remove(start, length)
2154 def remove(self, start, length):
2155 self.missing = replace(extend(self.missing, start, length, "1"),
2157 def add(self, start, data):
2158 self.missing = replace(extend(self.missing, start, len(data), "1"),
2159 start, "0"*len(data))
2160 self.data = replace(extend(self.data, start, len(data), " "),
2164 class StringSpans(unittest.TestCase):
2165 def do_basic(self, klass):
2167 self.failUnlessEqual(ds.len(), 0)
2168 self.failUnlessEqual(list(ds._dump()), [])
2169 self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 0)
2171 self.failUnlessEqual(ds.get(0, 4), None)
2172 self.failUnlessEqual(ds.pop(0, 4), None)
2176 self.failUnlessEqual(ds.len(), 4)
2177 self.failUnlessEqual(list(ds._dump()), [2,3,4,5])
2178 self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 4)
2180 self.failUnless((2,2) in s1)
2181 self.failUnlessEqual(ds.get(0, 4), None)
2182 self.failUnlessEqual(ds.pop(0, 4), None)
2183 self.failUnlessEqual(ds.get(4, 4), None)
2186 self.failUnlessEqual(ds2.len(), 4)
2187 self.failUnlessEqual(list(ds2._dump()), [2,3,4,5])
2188 self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_chunks()]), 4)
2189 self.failUnlessEqual(ds2.get(0, 4), None)
2190 self.failUnlessEqual(ds2.pop(0, 4), None)
2191 self.failUnlessEqual(ds2.pop(2, 3), "fou")
2192 self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_chunks()]), 1)
2193 self.failUnlessEqual(ds2.get(2, 3), None)
2194 self.failUnlessEqual(ds2.get(5, 1), "r")
2195 self.failUnlessEqual(ds.get(2, 3), "fou")
2196 self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 4)
2199 self.failUnlessEqual(ds.len(), 6)
2200 self.failUnlessEqual(list(ds._dump()), [0,1,2,3,4,5])
2201 self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 6)
2202 self.failUnlessEqual(ds.get(0, 4), "23fo")
2203 self.failUnlessEqual(ds.pop(0, 4), "23fo")
2204 self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 2)
2205 self.failUnlessEqual(ds.get(0, 4), None)
2206 self.failUnlessEqual(ds.pop(0, 4), None)
2211 self.failUnlessEqual(ds.get(2, 4), "fear")
2216 self.failUnlessEqual(ds.get(2L, 4L), "fear")
2219 def do_scan(self, klass):
2220 # do a test with gaps and spans of size 1 and 2
2221 # left=(1,11) * right=(1,11) * gapsize=(1,2)
2222 # 111, 112, 121, 122, 211, 212, 221, 222
2231 # 11 1 1 11 11 11 1 1 111
2232 # 0123456789012345678901234567
2233 # abcdefghijklmnopqrstuvwxyz-=
2234 pieces = [(1, "bc"),
2244 p_elements = set([1,2,4,7,9,10,12,13,16,17,20,22,25,26,27])
2245 S = "abcdefghijklmnopqrstuvwxyz-="
2246 # TODO: when adding data, add capital letters, to make sure we aren't
2247 # just leaving the old data in place
2251 for start, data in pieces:
2256 d = "".join([((i not in p) and " " or S[i]) for i in range(l)])
2260 for start in range(0, l):
2261 for end in range(start+1, l):
2262 # add [start-end) to the baseline
2263 which = "%d-%d" % (start, end-1)
2264 p_added = set(range(start, end))
2268 print dump(b), which
2269 add = klass(); add.add(start, S[start:end])
2271 b.add(start, S[start:end])
2274 # check that the new span is there
2275 d = b.get(start, end-start)
2276 self.failUnlessEqual(d, S[start:end], which)
2277 # check that all the original pieces are still there
2278 for t_start, t_data in pieces:
2280 self.failUnlessEqual(b.get(t_start, t_len),
2281 S[t_start:t_start+t_len],
2282 "%s %d+%d" % (which, t_start, t_len))
2283 # check that a lot of subspans are mostly correct
2284 for t_start in range(l):
2285 for t_len in range(1,4):
2286 d = b.get(t_start, t_len)
2288 which2 = "%s+(%d-%d)" % (which, t_start,
2290 self.failUnlessEqual(d, S[t_start:t_start+t_len],
2292 # check that removing a subspan gives the right value
2294 b2.remove(t_start, t_len)
2295 removed = set(range(t_start, t_start+t_len))
2297 exp = (((i in p_elements) or (i in p_added))
2298 and (i not in removed))
2299 which2 = "%s-(%d-%d)" % (which, t_start,
2301 self.failUnlessEqual(bool(b2.get(i, 1)), exp,
2304 def test_test(self):
2305 self.do_basic(SimpleDataSpans)
2306 self.do_scan(SimpleDataSpans)
2308 def test_basic(self):
2309 self.do_basic(DataSpans)
2310 self.do_scan(DataSpans)
2312 def test_random(self):
2313 # attempt to increase coverage of corner cases by comparing behavior
2314 # of a simple-but-slow model implementation against the
2315 # complex-but-fast actual implementation, in a large number of random
2317 S1 = SimpleDataSpans
2319 s1 = S1(); s2 = S2()
2321 def _randstr(length, seed):
2324 while created < length:
2325 piece = _hash(seed + str(created)).hexdigest()
2326 pieces.append(piece)
2327 created += len(piece)
2328 return "".join(pieces)[:length]
2329 def _create(subseed):
2330 ns1 = S1(); ns2 = S2()
2332 what = _hash(subseed+str(i)).hexdigest()
2333 start = int(what[2:4], 16)
2334 length = max(1,int(what[5:6], 16))
2335 ns1.add(start, _randstr(length, what[7:9]));
2336 ns2.add(start, _randstr(length, what[7:9]))
2340 for i in range(1000):
2341 what = _hash(seed+str(i)).hexdigest()
2344 start = int(what[2:4], 16)
2345 length = max(1,int(what[5:6], 16))
2348 if subop in "0123456":
2349 s1 = S1(); s2 = S2()
2351 s1, s2 = _create(what[7:11])
2352 #print "s2 = %s" % list(s2._dump())
2353 elif op in "123456":
2354 #print "s2.add(%d,%d)" % (start, length)
2355 s1.add(start, _randstr(length, what[7:9]));
2356 s2.add(start, _randstr(length, what[7:9]))
2357 elif op in "789abc":
2358 #print "s2.remove(%d,%d)" % (start, length)
2359 s1.remove(start, length); s2.remove(start, length)
2361 #print "s2.pop(%d,%d)" % (start, length)
2362 d1 = s1.pop(start, length); d2 = s2.pop(start, length)
2363 self.failUnlessEqual(d1, d2)
2364 #print "s1 now %s" % list(s1._dump())
2365 #print "s2 now %s" % list(s2._dump())
2366 self.failUnlessEqual(s1.len(), s2.len())
2367 self.failUnlessEqual(list(s1._dump()), list(s2._dump()))
2368 for j in range(100):
2369 what = _hash(what[12:14]+str(j)).hexdigest()
2370 start = int(what[2:4], 16)
2371 length = max(1, int(what[5:6], 16))
2372 d1 = s1.get(start, length); d2 = s2.get(start, length)
2373 self.failUnlessEqual(d1, d2, "%d+%d" % (start, length))