def foo(): pass # keep the line number constant
+import os, time, sys
+from StringIO import StringIO
from twisted.trial import unittest
+from twisted.internet import defer, reactor
+from twisted.python.failure import Failure
+from twisted.python import log
+from pycryptopp.hash.sha256 import SHA256 as _hash
-from allmydata.util import bencode, idlib, humanreadable
+from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil
+from allmydata.util import assertutil, fileutil, deferredutil, abbreviate
+from allmydata.util import limiter, time_format, pollmixin, cachedir
+from allmydata.util import statistics, dictutil, pipeline
+from allmydata.util import log as tahoe_log
+from allmydata.util.spans import Spans, overlap, DataSpans
-
-class IDLib(unittest.TestCase):
+class Base32(unittest.TestCase):
+ def test_b2a_matches_Pythons(self):
+ import base64
+ y = "\x12\x34\x45\x67\x89\x0a\xbc\xde\xf0"
+ x = base64.b32encode(y)
+ while x and x[-1] == '=':
+ x = x[:-1]
+ x = x.lower()
+ self.failUnlessEqual(base32.b2a(y), x)
def test_b2a(self):
- self.failUnlessEqual(idlib.b2a("\x12\x34"), "ci2a====")
+ self.failUnlessEqual(base32.b2a("\x12\x34"), "ci2a")
def test_b2a_or_none(self):
- self.failUnlessEqual(idlib.b2a_or_none(None), None)
- self.failUnlessEqual(idlib.b2a_or_none("\x12\x34"), "ci2a====")
+ self.failUnlessEqual(base32.b2a_or_none(None), None)
+ self.failUnlessEqual(base32.b2a_or_none("\x12\x34"), "ci2a")
def test_a2b(self):
- self.failUnlessEqual(idlib.a2b("ci2a===="), "\x12\x34")
- self.failUnlessRaises(TypeError, idlib.a2b, "bogus")
- def test_peerid(self):
- # these are 160-bit numbers
- peerid = "\x80" + "\x00" * 19
- short = idlib.peerid_to_short_string(peerid)
- self.failUnlessEqual(short, "qaaa")
+ self.failUnlessEqual(base32.a2b("ci2a"), "\x12\x34")
+ self.failUnlessRaises(AssertionError, base32.a2b, "b0gus")
+
+class IDLib(unittest.TestCase):
+ def test_nodeid_b2a(self):
+ self.failUnlessEqual(idlib.nodeid_b2a("\x00"*20), "a"*32)
class NoArgumentException(Exception):
def __init__(self):
self.failUnlessEqual(hr([1,2]), "[1, 2]")
self.failUnlessEqual(hr({1:2}), "{1:2}")
try:
- raise RuntimeError
+ raise ValueError
except Exception, e:
self.failUnless(
- hr(e) == "<RuntimeError: ()>" # python-2.4
- or hr(e) == "RuntimeError()") # python-2.5
+ hr(e) == "<ValueError: ()>" # python-2.4
+ or hr(e) == "ValueError()") # python-2.5
try:
- raise RuntimeError("oops")
+ raise ValueError("oops")
except Exception, e:
self.failUnless(
- hr(e) == "<RuntimeError: 'oops'>" # python-2.4
- or hr(e) == "RuntimeError('oops',)") # python-2.5
+ hr(e) == "<ValueError: 'oops'>" # python-2.4
+ or hr(e) == "ValueError('oops',)") # python-2.5
try:
raise NoArgumentException
except Exception, e:
class MyList(list):
pass
-class Bencode(unittest.TestCase):
- def test_bencode(self):
- e = bencode.bencode
- self.failUnlessEqual(e(4), "i4e")
- self.failUnlessEqual(e([1,2]), "li1ei2ee")
- self.failUnlessEqual(e(MyList([1,2])), "li1ei2ee")
- self.failUnlessEqual(e({1:2}), "di1ei2ee")
- self.failUnlessEqual(e(u"a"), "u1:a")
- self.failUnlessEqual(e([True,False]), "lb1b0e")
- self.failUnlessEqual(e(1.5), "f1.5e")
- self.failUnlessEqual(e("foo"), "3:foo")
- d = bencode.bdecode
- self.failUnlessEqual(d("li1ei2ee"), [1,2])
- self.failUnlessEqual(d("u1:a"), u"a")
- self.failUnlessRaises(ValueError, d, "u10:short")
- self.failUnlessEqual(d("lb1b0e"), [True,False])
- self.failUnlessRaises(ValueError, d, "b2")
- self.failUnlessEqual(d("f1.5e"), 1.5)
- self.failUnlessEqual(d("3:foo"), "foo")
- self.failUnlessRaises(ValueError, d,
- "38:When doing layout, always plan ah")
- # ooh! fascinating! bdecode requires string keys! I think this ought
- # to be changed
- #self.failUnlessEqual(d("di1ei2ee"), {1:2})
- self.failUnlessEqual(d("d1:ai2eu1:bi3ee"), {"a":2, u"b":3})
- self.failUnlessRaises(ValueError, d, "di1ei2ee")
- self.failUnlessRaises(ValueError, d, "d1:ai1e1:ai2ee")
-
- self.failUnlessRaises(ValueError, d, "i1ei2e")
-
- # now run all the module's builtin tests
- bencode.test_decode_raw_string()
- bencode.test_encode_and_decode_unicode_results_in_unicode_type()
- bencode.test_encode_and_decode_unicode_at_least_preserves_the_content_even_if_it_flattens_the_type()
- bencode.test_dict_forbids_non_string_key()
- bencode.test_dict_forbids_key_repeat()
- bencode.test_empty_dict()
- bencode.test_dict_allows_unicode_keys()
- bencode.test_ValueError_in_decode_unknown()
- bencode.test_encode_and_decode_none()
- bencode.test_encode_and_decode_long()
- bencode.test_encode_and_decode_int()
- bencode.test_encode_and_decode_float()
- bencode.test_encode_and_decode_bool()
- #bencode.test_decode_noncanonical_int()
- bencode.test_encode_and_decode_dict()
- bencode.test_encode_and_decode_list()
- bencode.test_encode_and_decode_tuple()
- bencode.test_encode_and_decode_empty_dict()
- bencode.test_encode_and_decode_complex_object()
- bencode.test_unfinished_list()
- bencode.test_unfinished_dict()
- bencode.test_unsupported_type()
+class Math(unittest.TestCase):
+ def test_div_ceil(self):
+ f = mathutil.div_ceil
+ self.failUnlessEqual(f(0, 1), 0)
+ self.failUnlessEqual(f(0, 2), 0)
+ self.failUnlessEqual(f(0, 3), 0)
+ self.failUnlessEqual(f(1, 3), 1)
+ self.failUnlessEqual(f(2, 3), 1)
+ self.failUnlessEqual(f(3, 3), 1)
+ self.failUnlessEqual(f(4, 3), 2)
+ self.failUnlessEqual(f(5, 3), 2)
+ self.failUnlessEqual(f(6, 3), 2)
+ self.failUnlessEqual(f(7, 3), 3)
+
+ def test_next_multiple(self):
+ f = mathutil.next_multiple
+ self.failUnlessEqual(f(5, 1), 5)
+ self.failUnlessEqual(f(5, 2), 6)
+ self.failUnlessEqual(f(5, 3), 6)
+ self.failUnlessEqual(f(5, 4), 8)
+ self.failUnlessEqual(f(5, 5), 5)
+ self.failUnlessEqual(f(5, 6), 6)
+ self.failUnlessEqual(f(32, 1), 32)
+ self.failUnlessEqual(f(32, 2), 32)
+ self.failUnlessEqual(f(32, 3), 33)
+ self.failUnlessEqual(f(32, 4), 32)
+ self.failUnlessEqual(f(32, 5), 35)
+ self.failUnlessEqual(f(32, 6), 36)
+ self.failUnlessEqual(f(32, 7), 35)
+ self.failUnlessEqual(f(32, 8), 32)
+ self.failUnlessEqual(f(32, 9), 36)
+ self.failUnlessEqual(f(32, 10), 40)
+ self.failUnlessEqual(f(32, 11), 33)
+ self.failUnlessEqual(f(32, 12), 36)
+ self.failUnlessEqual(f(32, 13), 39)
+ self.failUnlessEqual(f(32, 14), 42)
+ self.failUnlessEqual(f(32, 15), 45)
+ self.failUnlessEqual(f(32, 16), 32)
+ self.failUnlessEqual(f(32, 17), 34)
+ self.failUnlessEqual(f(32, 18), 36)
+ self.failUnlessEqual(f(32, 589), 589)
+
+ def test_pad_size(self):
+ f = mathutil.pad_size
+ self.failUnlessEqual(f(0, 4), 0)
+ self.failUnlessEqual(f(1, 4), 3)
+ self.failUnlessEqual(f(2, 4), 2)
+ self.failUnlessEqual(f(3, 4), 1)
+ self.failUnlessEqual(f(4, 4), 0)
+ self.failUnlessEqual(f(5, 4), 3)
+
+ def test_is_power_of_k(self):
+ f = mathutil.is_power_of_k
+ for i in range(1, 100):
+ if i in (1, 2, 4, 8, 16, 32, 64):
+ self.failUnless(f(i, 2), "but %d *is* a power of 2" % i)
+ else:
+ self.failIf(f(i, 2), "but %d is *not* a power of 2" % i)
+ for i in range(1, 100):
+ if i in (1, 3, 9, 27, 81):
+ self.failUnless(f(i, 3), "but %d *is* a power of 3" % i)
+ else:
+ self.failIf(f(i, 3), "but %d is *not* a power of 3" % i)
+
+ def test_next_power_of_k(self):
+ f = mathutil.next_power_of_k
+ self.failUnlessEqual(f(0,2), 1)
+ self.failUnlessEqual(f(1,2), 1)
+ self.failUnlessEqual(f(2,2), 2)
+ self.failUnlessEqual(f(3,2), 4)
+ self.failUnlessEqual(f(4,2), 4)
+ for i in range(5, 8): self.failUnlessEqual(f(i,2), 8, "%d" % i)
+ for i in range(9, 16): self.failUnlessEqual(f(i,2), 16, "%d" % i)
+ for i in range(17, 32): self.failUnlessEqual(f(i,2), 32, "%d" % i)
+ for i in range(33, 64): self.failUnlessEqual(f(i,2), 64, "%d" % i)
+ for i in range(65, 100): self.failUnlessEqual(f(i,2), 128, "%d" % i)
+
+ self.failUnlessEqual(f(0,3), 1)
+ self.failUnlessEqual(f(1,3), 1)
+ self.failUnlessEqual(f(2,3), 3)
+ self.failUnlessEqual(f(3,3), 3)
+ for i in range(4, 9): self.failUnlessEqual(f(i,3), 9, "%d" % i)
+ for i in range(10, 27): self.failUnlessEqual(f(i,3), 27, "%d" % i)
+ for i in range(28, 81): self.failUnlessEqual(f(i,3), 81, "%d" % i)
+ for i in range(82, 200): self.failUnlessEqual(f(i,3), 243, "%d" % i)
+
+ def test_ave(self):
+ f = mathutil.ave
+ self.failUnlessEqual(f([1,2,3]), 2)
+ self.failUnlessEqual(f([0,0,0,4]), 1)
+ self.failUnlessAlmostEqual(f([0.0, 1.0, 1.0]), .666666666666)
+
+ def test_round_sigfigs(self):
+ f = mathutil.round_sigfigs
+ self.failUnlessEqual(f(22.0/3, 4), 7.3330000000000002)
+
+class Statistics(unittest.TestCase):
+ def should_assert(self, msg, func, *args, **kwargs):
+ try:
+ func(*args, **kwargs)
+ self.fail(msg)
+ except AssertionError:
+ pass
+
+ def failUnlessListEqual(self, a, b, msg = None):
+ self.failUnlessEqual(len(a), len(b))
+ for i in range(len(a)):
+ self.failUnlessEqual(a[i], b[i], msg)
+
+ def failUnlessListAlmostEqual(self, a, b, places = 7, msg = None):
+ self.failUnlessEqual(len(a), len(b))
+ for i in range(len(a)):
+ self.failUnlessAlmostEqual(a[i], b[i], places, msg)
+
+ def test_binomial_coeff(self):
+ f = statistics.binomial_coeff
+ self.failUnlessEqual(f(20, 0), 1)
+ self.failUnlessEqual(f(20, 1), 20)
+ self.failUnlessEqual(f(20, 2), 190)
+ self.failUnlessEqual(f(20, 8), f(20, 12))
+ self.should_assert("Should assert if n < k", f, 2, 3)
+
+ def test_binomial_distribution_pmf(self):
+ f = statistics.binomial_distribution_pmf
+
+ pmf_comp = f(2, .1)
+ pmf_stat = [0.81, 0.18, 0.01]
+ self.failUnlessListAlmostEqual(pmf_comp, pmf_stat)
+
+ # Summing across a PMF should give the total probability 1
+ self.failUnlessAlmostEqual(sum(pmf_comp), 1)
+ self.should_assert("Should assert if not 0<=p<=1", f, 1, -1)
+ self.should_assert("Should assert if n < 1", f, 0, .1)
+
+ out = StringIO()
+ statistics.print_pmf(pmf_comp, out=out)
+ lines = out.getvalue().splitlines()
+ self.failUnlessEqual(lines[0], "i=0: 0.81")
+ self.failUnlessEqual(lines[1], "i=1: 0.18")
+ self.failUnlessEqual(lines[2], "i=2: 0.01")
+
+ def test_survival_pmf(self):
+ f = statistics.survival_pmf
+ # Cross-check binomial-distribution method against convolution
+ # method.
+ p_list = [.9999] * 100 + [.99] * 50 + [.8] * 20
+ pmf1 = statistics.survival_pmf_via_conv(p_list)
+ pmf2 = statistics.survival_pmf_via_bd(p_list)
+ self.failUnlessListAlmostEqual(pmf1, pmf2)
+ self.failUnlessTrue(statistics.valid_pmf(pmf1))
+ self.should_assert("Should assert if p_i > 1", f, [1.1]);
+ self.should_assert("Should assert if p_i < 0", f, [-.1]);
+
+ def test_repair_count_pmf(self):
+ survival_pmf = statistics.binomial_distribution_pmf(5, .9)
+ repair_pmf = statistics.repair_count_pmf(survival_pmf, 3)
+ # repair_pmf[0] == sum(survival_pmf[0,1,2,5])
+ # repair_pmf[1] == survival_pmf[4]
+ # repair_pmf[2] = survival_pmf[3]
+ self.failUnlessListAlmostEqual(repair_pmf,
+ [0.00001 + 0.00045 + 0.0081 + 0.59049,
+ .32805,
+ .0729,
+ 0, 0, 0])
+
+ def test_repair_cost(self):
+ survival_pmf = statistics.binomial_distribution_pmf(5, .9)
+ bwcost = statistics.bandwidth_cost_function
+ cost = statistics.mean_repair_cost(bwcost, 1000,
+ survival_pmf, 3, ul_dl_ratio=1.0)
+ self.failUnlessAlmostEqual(cost, 558.90)
+ cost = statistics.mean_repair_cost(bwcost, 1000,
+ survival_pmf, 3, ul_dl_ratio=8.0)
+ self.failUnlessAlmostEqual(cost, 1664.55)
+
+ # I haven't manually checked the math beyond here -warner
+ cost = statistics.eternal_repair_cost(bwcost, 1000,
+ survival_pmf, 3,
+ discount_rate=0, ul_dl_ratio=1.0)
+ self.failUnlessAlmostEqual(cost, 65292.056074766246)
+ cost = statistics.eternal_repair_cost(bwcost, 1000,
+ survival_pmf, 3,
+ discount_rate=0.05,
+ ul_dl_ratio=1.0)
+ self.failUnlessAlmostEqual(cost, 9133.6097158191551)
+
+ def test_convolve(self):
+ f = statistics.convolve
+ v1 = [ 1, 2, 3 ]
+ v2 = [ 4, 5, 6 ]
+ v3 = [ 7, 8 ]
+ v1v2result = [ 4, 13, 28, 27, 18 ]
+ # Convolution is commutative
+ r1 = f(v1, v2)
+ r2 = f(v2, v1)
+ self.failUnlessListEqual(r1, r2, "Convolution should be commutative")
+ self.failUnlessListEqual(r1, v1v2result, "Didn't match known result")
+ # Convolution is associative
+ r1 = f(f(v1, v2), v3)
+ r2 = f(v1, f(v2, v3))
+ self.failUnlessListEqual(r1, r2, "Convolution should be associative")
+ # Convolution is distributive
+ r1 = f(v3, [ a + b for a, b in zip(v1, v2) ])
+ tmp1 = f(v3, v1)
+ tmp2 = f(v3, v2)
+ r2 = [ a + b for a, b in zip(tmp1, tmp2) ]
+ self.failUnlessListEqual(r1, r2, "Convolution should be distributive")
+ # Convolution is scalar multiplication associative
+ tmp1 = f(v1, v2)
+ r1 = [ a * 4 for a in tmp1 ]
+ tmp2 = [ a * 4 for a in v1 ]
+ r2 = f(tmp2, v2)
+ self.failUnlessListEqual(r1, r2, "Convolution should be scalar multiplication associative")
+
+ def test_find_k(self):
+ f = statistics.find_k
+ g = statistics.pr_file_loss
+ plist = [.9] * 10 + [.8] * 10 # N=20
+ t = .0001
+ k = f(plist, t)
+ self.failUnlessEqual(k, 10)
+ self.failUnless(g(plist, k) < t)
+
+ def test_pr_file_loss(self):
+ f = statistics.pr_file_loss
+ plist = [.5] * 10
+ self.failUnlessEqual(f(plist, 3), .0546875)
+
+ def test_pr_backup_file_loss(self):
+ f = statistics.pr_backup_file_loss
+ plist = [.5] * 10
+ self.failUnlessEqual(f(plist, .5, 3), .02734375)
+
+
+class Asserts(unittest.TestCase):
+ def should_assert(self, func, *args, **kwargs):
+ try:
+ func(*args, **kwargs)
+ except AssertionError, e:
+ return str(e)
+ except Exception, e:
+ self.fail("assert failed with non-AssertionError: %s" % e)
+ self.fail("assert was not caught")
+
+ def should_not_assert(self, func, *args, **kwargs):
+ try:
+ func(*args, **kwargs)
+ except AssertionError, e:
+ self.fail("assertion fired when it should not have: %s" % e)
+ except Exception, e:
+ self.fail("assertion (which shouldn't have failed) failed with non-AssertionError: %s" % e)
+ return # we're happy
+
+
+ def test_assert(self):
+ f = assertutil._assert
+ self.should_assert(f)
+ self.should_assert(f, False)
+ self.should_not_assert(f, True)
+
+ m = self.should_assert(f, False, "message")
+ self.failUnlessEqual(m, "'message' <type 'str'>", m)
+ m = self.should_assert(f, False, "message1", othermsg=12)
+ self.failUnlessEqual("'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
+ m = self.should_assert(f, False, othermsg="message2")
+ self.failUnlessEqual("othermsg: 'message2' <type 'str'>", m)
+
+ def test_precondition(self):
+ f = assertutil.precondition
+ self.should_assert(f)
+ self.should_assert(f, False)
+ self.should_not_assert(f, True)
+
+ m = self.should_assert(f, False, "message")
+ self.failUnlessEqual("precondition: 'message' <type 'str'>", m)
+ m = self.should_assert(f, False, "message1", othermsg=12)
+ self.failUnlessEqual("precondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
+ m = self.should_assert(f, False, othermsg="message2")
+ self.failUnlessEqual("precondition: othermsg: 'message2' <type 'str'>", m)
+
+ def test_postcondition(self):
+ f = assertutil.postcondition
+ self.should_assert(f)
+ self.should_assert(f, False)
+ self.should_not_assert(f, True)
+
+ m = self.should_assert(f, False, "message")
+ self.failUnlessEqual("postcondition: 'message' <type 'str'>", m)
+ m = self.should_assert(f, False, "message1", othermsg=12)
+ self.failUnlessEqual("postcondition: 'message1' <type 'str'>, othermsg: 12 <type 'int'>", m)
+ m = self.should_assert(f, False, othermsg="message2")
+ self.failUnlessEqual("postcondition: othermsg: 'message2' <type 'str'>", m)
+
+class FileUtil(unittest.TestCase):
+ def mkdir(self, basedir, path, mode=0777):
+ fn = os.path.join(basedir, path)
+ fileutil.make_dirs(fn, mode)
+
+ def touch(self, basedir, path, mode=None, data="touch\n"):
+ fn = os.path.join(basedir, path)
+ f = open(fn, "w")
+ f.write(data)
+ f.close()
+ if mode is not None:
+ os.chmod(fn, mode)
+
+ def test_rm_dir(self):
+ basedir = "util/FileUtil/test_rm_dir"
+ fileutil.make_dirs(basedir)
+ # create it again to test idempotency
+ fileutil.make_dirs(basedir)
+ d = os.path.join(basedir, "doomed")
+ self.mkdir(d, "a/b")
+ self.touch(d, "a/b/1.txt")
+ self.touch(d, "a/b/2.txt", 0444)
+ self.touch(d, "a/b/3.txt", 0)
+ self.mkdir(d, "a/c")
+ self.touch(d, "a/c/1.txt")
+ self.touch(d, "a/c/2.txt", 0444)
+ self.touch(d, "a/c/3.txt", 0)
+ os.chmod(os.path.join(d, "a/c"), 0444)
+ self.mkdir(d, "a/d")
+ self.touch(d, "a/d/1.txt")
+ self.touch(d, "a/d/2.txt", 0444)
+ self.touch(d, "a/d/3.txt", 0)
+ os.chmod(os.path.join(d, "a/d"), 0)
+
+ fileutil.rm_dir(d)
+ self.failIf(os.path.exists(d))
+ # remove it again to test idempotency
+ fileutil.rm_dir(d)
+
+ def test_remove_if_possible(self):
+ basedir = "util/FileUtil/test_remove_if_possible"
+ fileutil.make_dirs(basedir)
+ self.touch(basedir, "here")
+ fn = os.path.join(basedir, "here")
+ fileutil.remove_if_possible(fn)
+ self.failIf(os.path.exists(fn))
+ fileutil.remove_if_possible(fn) # should be idempotent
+ fileutil.rm_dir(basedir)
+ fileutil.remove_if_possible(fn) # should survive errors
+
+ def test_write_atomically(self):
+ basedir = "util/FileUtil/test_write_atomically"
+ fileutil.make_dirs(basedir)
+ fn = os.path.join(basedir, "here")
+ fileutil.write_atomically(fn, "one")
+ self.failUnlessEqual(fileutil.read(fn), "one")
+ fileutil.write_atomically(fn, "two", mode="") # non-binary
+ self.failUnlessEqual(fileutil.read(fn), "two")
+
+ def test_open_or_create(self):
+ basedir = "util/FileUtil/test_open_or_create"
+ fileutil.make_dirs(basedir)
+ fn = os.path.join(basedir, "here")
+ f = fileutil.open_or_create(fn)
+ f.write("stuff.")
+ f.close()
+ f = fileutil.open_or_create(fn)
+ f.seek(0, os.SEEK_END)
+ f.write("more.")
+ f.close()
+ f = open(fn, "r")
+ data = f.read()
+ f.close()
+ self.failUnlessEqual(data, "stuff.more.")
+
+ def test_NamedTemporaryDirectory(self):
+ basedir = "util/FileUtil/test_NamedTemporaryDirectory"
+ fileutil.make_dirs(basedir)
+ td = fileutil.NamedTemporaryDirectory(dir=basedir)
+ name = td.name
+ self.failUnless(basedir in name)
+ self.failUnless(basedir in repr(td))
+ self.failUnless(os.path.isdir(name))
+ del td
+ # it is conceivable that we need to force gc here, but I'm not sure
+ self.failIf(os.path.isdir(name))
+
+ def test_rename(self):
+ basedir = "util/FileUtil/test_rename"
+ fileutil.make_dirs(basedir)
+ self.touch(basedir, "here")
+ fn = os.path.join(basedir, "here")
+ fn2 = os.path.join(basedir, "there")
+ fileutil.rename(fn, fn2)
+ self.failIf(os.path.exists(fn))
+ self.failUnless(os.path.exists(fn2))
+
+ def test_du(self):
+ basedir = "util/FileUtil/test_du"
+ fileutil.make_dirs(basedir)
+ d = os.path.join(basedir, "space-consuming")
+ self.mkdir(d, "a/b")
+ self.touch(d, "a/b/1.txt", data="a"*10)
+ self.touch(d, "a/b/2.txt", data="b"*11)
+ self.mkdir(d, "a/c")
+ self.touch(d, "a/c/1.txt", data="c"*12)
+ self.touch(d, "a/c/2.txt", data="d"*13)
+
+ used = fileutil.du(basedir)
+ self.failUnlessEqual(10+11+12+13, used)
+
+ def test_abspath_expanduser_unicode(self):
+ self.failUnlessRaises(AssertionError, fileutil.abspath_expanduser_unicode, "bytestring")
+
+ saved_cwd = os.path.normpath(os.getcwdu())
+ abspath_cwd = fileutil.abspath_expanduser_unicode(u".")
+ self.failUnless(isinstance(saved_cwd, unicode), saved_cwd)
+ self.failUnless(isinstance(abspath_cwd, unicode), abspath_cwd)
+ self.failUnlessEqual(abspath_cwd, saved_cwd)
+
+ # adapted from <http://svn.python.org/view/python/branches/release26-maint/Lib/test/test_posixpath.py?view=markup&pathrev=78279#test_abspath>
+
+ self.failUnlessIn(u"foo", fileutil.abspath_expanduser_unicode(u"foo"))
+ self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~"))
+
+ cwds = ['cwd']
+ try:
+ cwds.append(u'\xe7w\xf0'.encode(sys.getfilesystemencoding()
+ or 'ascii'))
+ except UnicodeEncodeError:
+ pass # the cwd can't be encoded -- test with ascii cwd only
+
+ for cwd in cwds:
+ try:
+ os.mkdir(cwd)
+ os.chdir(cwd)
+ for upath in (u'', u'fuu', u'f\xf9\xf9', u'/fuu', u'U:\\', u'~'):
+ uabspath = fileutil.abspath_expanduser_unicode(upath)
+ self.failUnless(isinstance(uabspath, unicode), uabspath)
+ finally:
+ os.chdir(saved_cwd)
+
+ def test_disk_stats(self):
+ avail = fileutil.get_available_space('.', 2**14)
+ if avail == 0:
+ raise unittest.SkipTest("This test will spuriously fail there is no disk space left.")
+
+ disk = fileutil.get_disk_stats('.', 2**13)
+ self.failUnless(disk['total'] > 0, disk['total'])
+ self.failUnless(disk['used'] > 0, disk['used'])
+ self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
+ self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
+ self.failUnless(disk['avail'] > 0, disk['avail'])
+
+ def test_disk_stats_avail_nonnegative(self):
+ # This test will spuriously fail if you have more than 2^128
+ # bytes of available space on your filesystem.
+ disk = fileutil.get_disk_stats('.', 2**128)
+ self.failUnlessEqual(disk['avail'], 0)
+
+class PollMixinTests(unittest.TestCase):
+ def setUp(self):
+ self.pm = pollmixin.PollMixin()
+
+ def test_PollMixin_True(self):
+ d = self.pm.poll(check_f=lambda : True,
+ pollinterval=0.1)
+ return d
+
+ def test_PollMixin_False_then_True(self):
+ i = iter([False, True])
+ d = self.pm.poll(check_f=i.next,
+ pollinterval=0.1)
+ return d
+
+ def test_timeout(self):
+ d = self.pm.poll(check_f=lambda: False,
+ pollinterval=0.01,
+ timeout=1)
+ def _suc(res):
+ self.fail("poll should have failed, not returned %s" % (res,))
+ def _err(f):
+ f.trap(pollmixin.TimeoutError)
+ return None # success
+ d.addCallbacks(_suc, _err)
+ return d
+
+class DeferredUtilTests(unittest.TestCase):
+ def test_gather_results(self):
+ d1 = defer.Deferred()
+ d2 = defer.Deferred()
+ res = deferredutil.gatherResults([d1, d2])
+ d1.errback(ValueError("BAD"))
+ def _callb(res):
+ self.fail("Should have errbacked, not resulted in %s" % (res,))
+ def _errb(thef):
+ thef.trap(ValueError)
+ res.addCallbacks(_callb, _errb)
+ return res
+
+ def test_success(self):
+ d1, d2 = defer.Deferred(), defer.Deferred()
+ good = []
+ bad = []
+ dlss = deferredutil.DeferredListShouldSucceed([d1,d2])
+ dlss.addCallbacks(good.append, bad.append)
+ d1.callback(1)
+ d2.callback(2)
+ self.failUnlessEqual(good, [[1,2]])
+ self.failUnlessEqual(bad, [])
+
+ def test_failure(self):
+ d1, d2 = defer.Deferred(), defer.Deferred()
+ good = []
+ bad = []
+ dlss = deferredutil.DeferredListShouldSucceed([d1,d2])
+ dlss.addCallbacks(good.append, bad.append)
+ d1.addErrback(lambda _ignore: None)
+ d2.addErrback(lambda _ignore: None)
+ d1.callback(1)
+ d2.errback(ValueError())
+ self.failUnlessEqual(good, [])
+ self.failUnlessEqual(len(bad), 1)
+ f = bad[0]
+ self.failUnless(isinstance(f, Failure))
+ self.failUnless(f.check(ValueError))
+
+class HashUtilTests(unittest.TestCase):
+
+ def test_random_key(self):
+ k = hashutil.random_key()
+ self.failUnlessEqual(len(k), hashutil.KEYLEN)
+
+ def test_sha256d(self):
+ h1 = hashutil.tagged_hash("tag1", "value")
+ h2 = hashutil.tagged_hasher("tag1")
+ h2.update("value")
+ h2a = h2.digest()
+ h2b = h2.digest()
+ self.failUnlessEqual(h1, h2a)
+ self.failUnlessEqual(h2a, h2b)
+
+ def test_sha256d_truncated(self):
+ h1 = hashutil.tagged_hash("tag1", "value", 16)
+ h2 = hashutil.tagged_hasher("tag1", 16)
+ h2.update("value")
+ h2 = h2.digest()
+ self.failUnlessEqual(len(h1), 16)
+ self.failUnlessEqual(len(h2), 16)
+ self.failUnlessEqual(h1, h2)
+
+ def test_chk(self):
+ h1 = hashutil.convergence_hash(3, 10, 1000, "data", "secret")
+ h2 = hashutil.convergence_hasher(3, 10, 1000, "secret")
+ h2.update("data")
+ h2 = h2.digest()
+ self.failUnlessEqual(h1, h2)
+
+ def test_hashers(self):
+ h1 = hashutil.block_hash("foo")
+ h2 = hashutil.block_hasher()
+ h2.update("foo")
+ self.failUnlessEqual(h1, h2.digest())
+
+ h1 = hashutil.uri_extension_hash("foo")
+ h2 = hashutil.uri_extension_hasher()
+ h2.update("foo")
+ self.failUnlessEqual(h1, h2.digest())
+
+ h1 = hashutil.plaintext_hash("foo")
+ h2 = hashutil.plaintext_hasher()
+ h2.update("foo")
+ self.failUnlessEqual(h1, h2.digest())
+
+ h1 = hashutil.crypttext_hash("foo")
+ h2 = hashutil.crypttext_hasher()
+ h2.update("foo")
+ self.failUnlessEqual(h1, h2.digest())
+
+ h1 = hashutil.crypttext_segment_hash("foo")
+ h2 = hashutil.crypttext_segment_hasher()
+ h2.update("foo")
+ self.failUnlessEqual(h1, h2.digest())
+
+ h1 = hashutil.plaintext_segment_hash("foo")
+ h2 = hashutil.plaintext_segment_hasher()
+ h2.update("foo")
+ self.failUnlessEqual(h1, h2.digest())
+
+ def test_constant_time_compare(self):
+ self.failUnless(hashutil.constant_time_compare("a", "a"))
+ self.failUnless(hashutil.constant_time_compare("ab", "ab"))
+ self.failIf(hashutil.constant_time_compare("a", "b"))
+ self.failIf(hashutil.constant_time_compare("a", "aa"))
+
+ def _testknown(self, hashf, expected_a, *args):
+ got = hashf(*args)
+ got_a = base32.b2a(got)
+ self.failUnlessEqual(got_a, expected_a)
+
+ def test_known_answers(self):
+ # assert backwards compatibility
+ self._testknown(hashutil.storage_index_hash, "qb5igbhcc5esa6lwqorsy7e6am", "")
+ self._testknown(hashutil.block_hash, "msjr5bh4evuh7fa3zw7uovixfbvlnstr5b65mrerwfnvjxig2jvq", "")
+ self._testknown(hashutil.uri_extension_hash, "wthsu45q7zewac2mnivoaa4ulh5xvbzdmsbuyztq2a5fzxdrnkka", "")
+ self._testknown(hashutil.plaintext_hash, "5lz5hwz3qj3af7n6e3arblw7xzutvnd3p3fjsngqjcb7utf3x3da", "")
+ self._testknown(hashutil.crypttext_hash, "itdj6e4njtkoiavlrmxkvpreosscssklunhwtvxn6ggho4rkqwga", "")
+ self._testknown(hashutil.crypttext_segment_hash, "aovy5aa7jej6ym5ikgwyoi4pxawnoj3wtaludjz7e2nb5xijb7aa", "")
+ self._testknown(hashutil.plaintext_segment_hash, "4fdgf6qruaisyukhqcmoth4t3li6bkolbxvjy4awwcpprdtva7za", "")
+ self._testknown(hashutil.convergence_hash, "3mo6ni7xweplycin6nowynw2we", 3, 10, 100, "", "converge")
+ self._testknown(hashutil.my_renewal_secret_hash, "ujhr5k5f7ypkp67jkpx6jl4p47pyta7hu5m527cpcgvkafsefm6q", "")
+ self._testknown(hashutil.my_cancel_secret_hash, "rjwzmafe2duixvqy6h47f5wfrokdziry6zhx4smew4cj6iocsfaa", "")
+ self._testknown(hashutil.file_renewal_secret_hash, "hzshk2kf33gzbd5n3a6eszkf6q6o6kixmnag25pniusyaulqjnia", "", "si")
+ self._testknown(hashutil.file_cancel_secret_hash, "bfciwvr6w7wcavsngxzxsxxaszj72dej54n4tu2idzp6b74g255q", "", "si")
+ self._testknown(hashutil.bucket_renewal_secret_hash, "e7imrzgzaoashsncacvy3oysdd2m5yvtooo4gmj4mjlopsazmvuq", "", "\x00"*20)
+ self._testknown(hashutil.bucket_cancel_secret_hash, "dvdujeyxeirj6uux6g7xcf4lvesk632aulwkzjar7srildvtqwma", "", "\x00"*20)
+ self._testknown(hashutil.hmac, "c54ypfi6pevb3nvo6ba42jtglpkry2kbdopqsi7dgrm4r7tw5sra", "tag", "")
+ self._testknown(hashutil.mutable_rwcap_key_hash, "6rvn2iqrghii5n4jbbwwqqsnqu", "iv", "wk")
+ self._testknown(hashutil.ssk_writekey_hash, "ykpgmdbpgbb6yqz5oluw2q26ye", "")
+ self._testknown(hashutil.ssk_write_enabler_master_hash, "izbfbfkoait4dummruol3gy2bnixrrrslgye6ycmkuyujnenzpia", "")
+ self._testknown(hashutil.ssk_write_enabler_hash, "fuu2dvx7g6gqu5x22vfhtyed7p4pd47y5hgxbqzgrlyvxoev62tq", "wk", "\x00"*20)
+ self._testknown(hashutil.ssk_pubkey_fingerprint_hash, "3opzw4hhm2sgncjx224qmt5ipqgagn7h5zivnfzqycvgqgmgz35q", "")
+ self._testknown(hashutil.ssk_readkey_hash, "vugid4as6qbqgeq2xczvvcedai", "")
+ self._testknown(hashutil.ssk_readkey_data_hash, "73wsaldnvdzqaf7v4pzbr2ae5a", "iv", "rk")
+ self._testknown(hashutil.ssk_storage_index_hash, "j7icz6kigb6hxrej3tv4z7ayym", "")
+
+
+class Abbreviate(unittest.TestCase):
+ def test_time(self):
+ a = abbreviate.abbreviate_time
+ self.failUnlessEqual(a(None), "unknown")
+ self.failUnlessEqual(a(0), "0 seconds")
+ self.failUnlessEqual(a(1), "1 second")
+ self.failUnlessEqual(a(2), "2 seconds")
+ self.failUnlessEqual(a(119), "119 seconds")
+ MIN = 60
+ self.failUnlessEqual(a(2*MIN), "2 minutes")
+ self.failUnlessEqual(a(60*MIN), "60 minutes")
+ self.failUnlessEqual(a(179*MIN), "179 minutes")
+ HOUR = 60*MIN
+ self.failUnlessEqual(a(180*MIN), "3 hours")
+ self.failUnlessEqual(a(4*HOUR), "4 hours")
+ DAY = 24*HOUR
+ MONTH = 30*DAY
+ self.failUnlessEqual(a(2*DAY), "2 days")
+ self.failUnlessEqual(a(2*MONTH), "2 months")
+ YEAR = 365*DAY
+ self.failUnlessEqual(a(5*YEAR), "5 years")
+
+ def test_space(self):
+ tests_si = [(None, "unknown"),
+ (0, "0 B"),
+ (1, "1 B"),
+ (999, "999 B"),
+ (1000, "1000 B"),
+ (1023, "1023 B"),
+ (1024, "1.02 kB"),
+ (20*1000, "20.00 kB"),
+ (1024*1024, "1.05 MB"),
+ (1000*1000, "1.00 MB"),
+ (1000*1000*1000, "1.00 GB"),
+ (1000*1000*1000*1000, "1.00 TB"),
+ (1000*1000*1000*1000*1000, "1.00 PB"),
+ (1000*1000*1000*1000*1000*1000, "1.00 EB"),
+ (1234567890123456789, "1.23 EB"),
+ ]
+ for (x, expected) in tests_si:
+ got = abbreviate.abbreviate_space(x, SI=True)
+ self.failUnlessEqual(got, expected)
+
+ tests_base1024 = [(None, "unknown"),
+ (0, "0 B"),
+ (1, "1 B"),
+ (999, "999 B"),
+ (1000, "1000 B"),
+ (1023, "1023 B"),
+ (1024, "1.00 kiB"),
+ (20*1024, "20.00 kiB"),
+ (1000*1000, "976.56 kiB"),
+ (1024*1024, "1.00 MiB"),
+ (1024*1024*1024, "1.00 GiB"),
+ (1024*1024*1024*1024, "1.00 TiB"),
+ (1000*1000*1000*1000*1000, "909.49 TiB"),
+ (1024*1024*1024*1024*1024, "1.00 PiB"),
+ (1024*1024*1024*1024*1024*1024, "1.00 EiB"),
+ (1234567890123456789, "1.07 EiB"),
+ ]
+ for (x, expected) in tests_base1024:
+ got = abbreviate.abbreviate_space(x, SI=False)
+ self.failUnlessEqual(got, expected)
+
+ self.failUnlessEqual(abbreviate.abbreviate_space_both(1234567),
+ "(1.23 MB, 1.18 MiB)")
+
+ def test_parse_space(self):
+ p = abbreviate.parse_abbreviated_size
+ self.failUnlessEqual(p(""), None)
+ self.failUnlessEqual(p(None), None)
+ self.failUnlessEqual(p("123"), 123)
+ self.failUnlessEqual(p("123B"), 123)
+ self.failUnlessEqual(p("2K"), 2000)
+ self.failUnlessEqual(p("2kb"), 2000)
+ self.failUnlessEqual(p("2KiB"), 2048)
+ self.failUnlessEqual(p("10MB"), 10*1000*1000)
+ self.failUnlessEqual(p("10MiB"), 10*1024*1024)
+ self.failUnlessEqual(p("5G"), 5*1000*1000*1000)
+ self.failUnlessEqual(p("4GiB"), 4*1024*1024*1024)
+ self.failUnlessEqual(p("3TB"), 3*1000*1000*1000*1000)
+ self.failUnlessEqual(p("3TiB"), 3*1024*1024*1024*1024)
+ self.failUnlessEqual(p("6PB"), 6*1000*1000*1000*1000*1000)
+ self.failUnlessEqual(p("6PiB"), 6*1024*1024*1024*1024*1024)
+ self.failUnlessEqual(p("9EB"), 9*1000*1000*1000*1000*1000*1000)
+ self.failUnlessEqual(p("9EiB"), 9*1024*1024*1024*1024*1024*1024)
+
+ e = self.failUnlessRaises(ValueError, p, "12 cubits")
+ self.failUnlessIn("12 cubits", str(e))
+ e = self.failUnlessRaises(ValueError, p, "1 BB")
+ self.failUnlessIn("1 BB", str(e))
+ e = self.failUnlessRaises(ValueError, p, "fhtagn")
+ self.failUnlessIn("fhtagn", str(e))
+
+class Limiter(unittest.TestCase):
+ timeout = 480 # This takes longer than 240 seconds on Francois's arm box.
+
+ def job(self, i, foo):
+ self.calls.append( (i, foo) )
+ self.simultaneous += 1
+ self.peak_simultaneous = max(self.simultaneous, self.peak_simultaneous)
+ d = defer.Deferred()
+ def _done():
+ self.simultaneous -= 1
+ d.callback("done %d" % i)
+ reactor.callLater(1.0, _done)
+ return d
+
+ def bad_job(self, i, foo):
+ raise ValueError("bad_job %d" % i)
+
+ def test_limiter(self):
+ self.calls = []
+ self.simultaneous = 0
+ self.peak_simultaneous = 0
+ l = limiter.ConcurrencyLimiter()
+ dl = []
+ for i in range(20):
+ dl.append(l.add(self.job, i, foo=str(i)))
+ d = defer.DeferredList(dl, fireOnOneErrback=True)
+ def _done(res):
+ self.failUnlessEqual(self.simultaneous, 0)
+ self.failUnless(self.peak_simultaneous <= 10)
+ self.failUnlessEqual(len(self.calls), 20)
+ for i in range(20):
+ self.failUnless( (i, str(i)) in self.calls)
+ d.addCallback(_done)
+ return d
+
+ def test_errors(self):
+ self.calls = []
+ self.simultaneous = 0
+ self.peak_simultaneous = 0
+ l = limiter.ConcurrencyLimiter()
+ dl = []
+ for i in range(20):
+ dl.append(l.add(self.job, i, foo=str(i)))
+ d2 = l.add(self.bad_job, 21, "21")
+ d = defer.DeferredList(dl, fireOnOneErrback=True)
+ def _most_done(res):
+ results = []
+ for (success, result) in res:
+ self.failUnlessEqual(success, True)
+ results.append(result)
+ results.sort()
+ expected_results = ["done %d" % i for i in range(20)]
+ expected_results.sort()
+ self.failUnlessEqual(results, expected_results)
+ self.failUnless(self.peak_simultaneous <= 10)
+ self.failUnlessEqual(len(self.calls), 20)
+ for i in range(20):
+ self.failUnless( (i, str(i)) in self.calls)
+ def _good(res):
+ self.fail("should have failed, not got %s" % (res,))
+ def _err(f):
+ f.trap(ValueError)
+ self.failUnless("bad_job 21" in str(f))
+ d2.addCallbacks(_good, _err)
+ return d2
+ d.addCallback(_most_done)
+ def _all_done(res):
+ self.failUnlessEqual(self.simultaneous, 0)
+ self.failUnless(self.peak_simultaneous <= 10)
+ self.failUnlessEqual(len(self.calls), 20)
+ for i in range(20):
+ self.failUnless( (i, str(i)) in self.calls)
+ d.addCallback(_all_done)
+ return d
+
+class TimeFormat(unittest.TestCase):
+ def test_epoch(self):
+ return self._help_test_epoch()
+
+ def test_epoch_in_London(self):
+ # Europe/London is a particularly troublesome timezone. Nowadays, its
+ # offset from GMT is 0. But in 1970, its offset from GMT was 1.
+ # (Apparently in 1970 Britain had redefined standard time to be GMT+1
+ # and stayed in standard time all year round, whereas today
+ # Europe/London standard time is GMT and Europe/London Daylight
+ # Savings Time is GMT+1.) The current implementation of
+ # time_format.iso_utc_time_to_localseconds() breaks if the timezone is
+ # Europe/London. (As soon as this unit test is done then I'll change
+ # that implementation to something that works even in this case...)
+ origtz = os.environ.get('TZ')
+ os.environ['TZ'] = "Europe/London"
+ if hasattr(time, 'tzset'):
+ time.tzset()
+ try:
+ return self._help_test_epoch()
+ finally:
+ if origtz is None:
+ del os.environ['TZ']
+ else:
+ os.environ['TZ'] = origtz
+ if hasattr(time, 'tzset'):
+ time.tzset()
+
+ def _help_test_epoch(self):
+ origtzname = time.tzname
+ s = time_format.iso_utc_time_to_seconds("1970-01-01T00:00:01")
+ self.failUnlessEqual(s, 1.0)
+ s = time_format.iso_utc_time_to_seconds("1970-01-01_00:00:01")
+ self.failUnlessEqual(s, 1.0)
+ s = time_format.iso_utc_time_to_seconds("1970-01-01 00:00:01")
+ self.failUnlessEqual(s, 1.0)
+
+ self.failUnlessEqual(time_format.iso_utc(1.0), "1970-01-01_00:00:01")
+ self.failUnlessEqual(time_format.iso_utc(1.0, sep=" "),
+ "1970-01-01 00:00:01")
+
+ now = time.time()
+ isostr = time_format.iso_utc(now)
+ timestamp = time_format.iso_utc_time_to_seconds(isostr)
+ self.failUnlessEqual(int(timestamp), int(now))
+
+ def my_time():
+ return 1.0
+ self.failUnlessEqual(time_format.iso_utc(t=my_time),
+ "1970-01-01_00:00:01")
+ e = self.failUnlessRaises(ValueError,
+ time_format.iso_utc_time_to_seconds,
+ "invalid timestring")
+ self.failUnless("not a complete ISO8601 timestamp" in str(e))
+ s = time_format.iso_utc_time_to_seconds("1970-01-01_00:00:01.500")
+ self.failUnlessEqual(s, 1.5)
+
+ # Look for daylight-savings-related errors.
+ thatmomentinmarch = time_format.iso_utc_time_to_seconds("2009-03-20 21:49:02.226536")
+ self.failUnlessEqual(thatmomentinmarch, 1237585742.226536)
+ self.failUnlessEqual(origtzname, time.tzname)
+
+ def test_iso_utc(self):
+ when = 1266760143.7841301
+ out = time_format.iso_utc_date(when)
+ self.failUnlessEqual(out, "2010-02-21")
+ out = time_format.iso_utc_date(t=lambda: when)
+ self.failUnlessEqual(out, "2010-02-21")
+ out = time_format.iso_utc(when)
+ self.failUnlessEqual(out, "2010-02-21_13:49:03.784130")
+ out = time_format.iso_utc(when, sep="-")
+ self.failUnlessEqual(out, "2010-02-21-13:49:03.784130")
+
+ def test_parse_duration(self):
+ p = time_format.parse_duration
+ DAY = 24*60*60
+ self.failUnlessEqual(p("1 day"), DAY)
+ self.failUnlessEqual(p("2 days"), 2*DAY)
+ self.failUnlessEqual(p("3 months"), 3*31*DAY)
+ self.failUnlessEqual(p("4 mo"), 4*31*DAY)
+ self.failUnlessEqual(p("5 years"), 5*365*DAY)
+ e = self.failUnlessRaises(ValueError, p, "123")
+ self.failUnlessIn("no unit (like day, month, or year) in '123'",
+ str(e))
+
+ def test_parse_date(self):
+ self.failUnlessEqual(time_format.parse_date("2010-02-21"), 1266710400)
+
+class CacheDir(unittest.TestCase):
+ def test_basic(self):
+ basedir = "test_util/CacheDir/test_basic"
+
+ def _failIfExists(name):
+ absfn = os.path.join(basedir, name)
+ self.failIf(os.path.exists(absfn),
+ "%s exists but it shouldn't" % absfn)
+
+ def _failUnlessExists(name):
+ absfn = os.path.join(basedir, name)
+ self.failUnless(os.path.exists(absfn),
+ "%s doesn't exist but it should" % absfn)
+
+ cdm = cachedir.CacheDirectoryManager(basedir)
+ a = cdm.get_file("a")
+ b = cdm.get_file("b")
+ c = cdm.get_file("c")
+ f = open(a.get_filename(), "wb"); f.write("hi"); f.close(); del f
+ f = open(b.get_filename(), "wb"); f.write("hi"); f.close(); del f
+ f = open(c.get_filename(), "wb"); f.write("hi"); f.close(); del f
+
+ _failUnlessExists("a")
+ _failUnlessExists("b")
+ _failUnlessExists("c")
+
+ cdm.check()
+
+ _failUnlessExists("a")
+ _failUnlessExists("b")
+ _failUnlessExists("c")
+
+ del a
+ # this file won't be deleted yet, because it isn't old enough
+ cdm.check()
+ _failUnlessExists("a")
+ _failUnlessExists("b")
+ _failUnlessExists("c")
+
+ # we change the definition of "old" to make everything old
+ cdm.old = -10
+
+ cdm.check()
+ _failIfExists("a")
+ _failUnlessExists("b")
+ _failUnlessExists("c")
+
+ cdm.old = 60*60
+
+ del b
+
+ cdm.check()
+ _failIfExists("a")
+ _failUnlessExists("b")
+ _failUnlessExists("c")
+
+ b2 = cdm.get_file("b")
+
+ cdm.check()
+ _failIfExists("a")
+ _failUnlessExists("b")
+ _failUnlessExists("c")
+ del b2
+
+ctr = [0]
+class EqButNotIs:
+ def __init__(self, x):
+ self.x = x
+ self.hash = ctr[0]
+ ctr[0] += 1
+ def __repr__(self):
+ return "<%s %s>" % (self.__class__.__name__, self.x,)
+ def __hash__(self):
+ return self.hash
+ def __le__(self, other):
+ return self.x <= other
+ def __lt__(self, other):
+ return self.x < other
+ def __ge__(self, other):
+ return self.x >= other
+ def __gt__(self, other):
+ return self.x > other
+ def __ne__(self, other):
+ return self.x != other
+ def __eq__(self, other):
+ return self.x == other
+
+class DictUtil(unittest.TestCase):
+ def _help_test_empty_dict(self, klass):
+ d1 = klass()
+ d2 = klass({})
+
+ self.failUnless(d1 == d2, "d1: %r, d2: %r" % (d1, d2,))
+ self.failUnless(len(d1) == 0)
+ self.failUnless(len(d2) == 0)
+
+ def _help_test_nonempty_dict(self, klass):
+ d1 = klass({'a': 1, 'b': "eggs", 3: "spam",})
+ d2 = klass({'a': 1, 'b': "eggs", 3: "spam",})
+
+ self.failUnless(d1 == d2)
+ self.failUnless(len(d1) == 3, "%s, %s" % (len(d1), d1,))
+ self.failUnless(len(d2) == 3)
+
+ def _help_test_eq_but_notis(self, klass):
+ d = klass({'a': 3, 'b': EqButNotIs(3), 'c': 3})
+ d.pop('b')
+
+ d.clear()
+ d['a'] = 3
+ d['b'] = EqButNotIs(3)
+ d['c'] = 3
+ d.pop('b')
+
+ d.clear()
+ d['b'] = EqButNotIs(3)
+ d['a'] = 3
+ d['c'] = 3
+ d.pop('b')
+
+ d.clear()
+ d['a'] = EqButNotIs(3)
+ d['c'] = 3
+ d['a'] = 3
+
+ d.clear()
+ fake3 = EqButNotIs(3)
+ fake7 = EqButNotIs(7)
+ d[fake3] = fake7
+ d[3] = 7
+ d[3] = 8
+ self.failUnless(filter(lambda x: x is 8, d.itervalues()))
+ self.failUnless(filter(lambda x: x is fake7, d.itervalues()))
+ # The real 7 should have been ejected by the d[3] = 8.
+ self.failUnless(not filter(lambda x: x is 7, d.itervalues()))
+ self.failUnless(filter(lambda x: x is fake3, d.iterkeys()))
+ self.failUnless(filter(lambda x: x is 3, d.iterkeys()))
+ d[fake3] = 8
+
+ d.clear()
+ d[3] = 7
+ fake3 = EqButNotIs(3)
+ fake7 = EqButNotIs(7)
+ d[fake3] = fake7
+ d[3] = 8
+ self.failUnless(filter(lambda x: x is 8, d.itervalues()))
+ self.failUnless(filter(lambda x: x is fake7, d.itervalues()))
+ # The real 7 should have been ejected by the d[3] = 8.
+ self.failUnless(not filter(lambda x: x is 7, d.itervalues()))
+ self.failUnless(filter(lambda x: x is fake3, d.iterkeys()))
+ self.failUnless(filter(lambda x: x is 3, d.iterkeys()))
+ d[fake3] = 8
+
+ def test_all(self):
+ self._help_test_eq_but_notis(dictutil.UtilDict)
+ self._help_test_eq_but_notis(dictutil.NumDict)
+ self._help_test_eq_but_notis(dictutil.ValueOrderedDict)
+ self._help_test_nonempty_dict(dictutil.UtilDict)
+ self._help_test_nonempty_dict(dictutil.NumDict)
+ self._help_test_nonempty_dict(dictutil.ValueOrderedDict)
+ self._help_test_eq_but_notis(dictutil.UtilDict)
+ self._help_test_eq_but_notis(dictutil.NumDict)
+ self._help_test_eq_but_notis(dictutil.ValueOrderedDict)
+
+ def test_dict_of_sets(self):
+ ds = dictutil.DictOfSets()
+ ds.add(1, "a")
+ ds.add(2, "b")
+ ds.add(2, "b")
+ ds.add(2, "c")
+ self.failUnlessEqual(ds[1], set(["a"]))
+ self.failUnlessEqual(ds[2], set(["b", "c"]))
+ ds.discard(3, "d") # should not raise an exception
+ ds.discard(2, "b")
+ self.failUnlessEqual(ds[2], set(["c"]))
+ ds.discard(2, "c")
+ self.failIf(2 in ds)
+
+ ds.add(3, "f")
+ ds2 = dictutil.DictOfSets()
+ ds2.add(3, "f")
+ ds2.add(3, "g")
+ ds2.add(4, "h")
+ ds.update(ds2)
+ self.failUnlessEqual(ds[1], set(["a"]))
+ self.failUnlessEqual(ds[3], set(["f", "g"]))
+ self.failUnlessEqual(ds[4], set(["h"]))
+
+ def test_move(self):
+ d1 = {1: "a", 2: "b"}
+ d2 = {2: "c", 3: "d"}
+ dictutil.move(1, d1, d2)
+ self.failUnlessEqual(d1, {2: "b"})
+ self.failUnlessEqual(d2, {1: "a", 2: "c", 3: "d"})
+
+ d1 = {1: "a", 2: "b"}
+ d2 = {2: "c", 3: "d"}
+ dictutil.move(2, d1, d2)
+ self.failUnlessEqual(d1, {1: "a"})
+ self.failUnlessEqual(d2, {2: "b", 3: "d"})
+
+ d1 = {1: "a", 2: "b"}
+ d2 = {2: "c", 3: "d"}
+ self.failUnlessRaises(KeyError, dictutil.move, 5, d1, d2, strict=True)
+
+ def test_subtract(self):
+ d1 = {1: "a", 2: "b"}
+ d2 = {2: "c", 3: "d"}
+ d3 = dictutil.subtract(d1, d2)
+ self.failUnlessEqual(d3, {1: "a"})
+
+ d1 = {1: "a", 2: "b"}
+ d2 = {2: "c"}
+ d3 = dictutil.subtract(d1, d2)
+ self.failUnlessEqual(d3, {1: "a"})
+
+ def test_utildict(self):
+ d = dictutil.UtilDict({1: "a", 2: "b"})
+ d.del_if_present(1)
+ d.del_if_present(3)
+ self.failUnlessEqual(d, {2: "b"})
+ def eq(a, b):
+ return a == b
+ self.failUnlessRaises(TypeError, eq, d, "not a dict")
+
+ d = dictutil.UtilDict({1: "b", 2: "a"})
+ self.failUnlessEqual(d.items_sorted_by_value(),
+ [(2, "a"), (1, "b")])
+ self.failUnlessEqual(d.items_sorted_by_key(),
+ [(1, "b"), (2, "a")])
+ self.failUnlessEqual(repr(d), "{1: 'b', 2: 'a'}")
+ self.failUnless(1 in d)
+
+ d2 = dictutil.UtilDict({3: "c", 4: "d"})
+ self.failUnless(d != d2)
+ self.failUnless(d2 > d)
+ self.failUnless(d2 >= d)
+ self.failUnless(d <= d2)
+ self.failUnless(d < d2)
+ self.failUnlessEqual(d[1], "b")
+ self.failUnlessEqual(sorted(list([k for k in d])), [1,2])
+
+ d3 = d.copy()
+ self.failUnlessEqual(d, d3)
+ self.failUnless(isinstance(d3, dictutil.UtilDict))
+
+ d4 = d.fromkeys([3,4], "e")
+ self.failUnlessEqual(d4, {3: "e", 4: "e"})
+
+ self.failUnlessEqual(d.get(1), "b")
+ self.failUnlessEqual(d.get(3), None)
+ self.failUnlessEqual(d.get(3, "default"), "default")
+ self.failUnlessEqual(sorted(list(d.items())),
+ [(1, "b"), (2, "a")])
+ self.failUnlessEqual(sorted(list(d.iteritems())),
+ [(1, "b"), (2, "a")])
+ self.failUnlessEqual(sorted(d.keys()), [1, 2])
+ self.failUnlessEqual(sorted(d.values()), ["a", "b"])
+ x = d.setdefault(1, "new")
+ self.failUnlessEqual(x, "b")
+ self.failUnlessEqual(d[1], "b")
+ x = d.setdefault(3, "new")
+ self.failUnlessEqual(x, "new")
+ self.failUnlessEqual(d[3], "new")
+ del d[3]
+
+ x = d.popitem()
+ self.failUnless(x in [(1, "b"), (2, "a")])
+ x = d.popitem()
+ self.failUnless(x in [(1, "b"), (2, "a")])
+ self.failUnlessRaises(KeyError, d.popitem)
+
+ def test_numdict(self):
+ d = dictutil.NumDict({"a": 1, "b": 2})
+
+ d.add_num("a", 10, 5)
+ d.add_num("c", 20, 5)
+ d.add_num("d", 30)
+ self.failUnlessEqual(d, {"a": 11, "b": 2, "c": 25, "d": 30})
+
+ d.subtract_num("a", 10)
+ d.subtract_num("e", 10)
+ d.subtract_num("f", 10, 15)
+ self.failUnlessEqual(d, {"a": 1, "b": 2, "c": 25, "d": 30,
+ "e": -10, "f": 5})
+
+ self.failUnlessEqual(d.sum(), sum([1, 2, 25, 30, -10, 5]))
+
+ d = dictutil.NumDict()
+ d.inc("a")
+ d.inc("a")
+ d.inc("b", 5)
+ self.failUnlessEqual(d, {"a": 2, "b": 6})
+ d.dec("a")
+ d.dec("c")
+ d.dec("d", 5)
+ self.failUnlessEqual(d, {"a": 1, "b": 6, "c": -1, "d": 4})
+ self.failUnlessEqual(d.items_sorted_by_key(),
+ [("a", 1), ("b", 6), ("c", -1), ("d", 4)])
+ self.failUnlessEqual(d.items_sorted_by_value(),
+ [("c", -1), ("a", 1), ("d", 4), ("b", 6)])
+ self.failUnlessEqual(d.item_with_largest_value(), ("b", 6))
+
+ d = dictutil.NumDict({"a": 1, "b": 2})
+ self.failUnlessEqual(repr(d), "{'a': 1, 'b': 2}")
+ self.failUnless("a" in d)
+
+ d2 = dictutil.NumDict({"c": 3, "d": 4})
+ self.failUnless(d != d2)
+ self.failUnless(d2 > d)
+ self.failUnless(d2 >= d)
+ self.failUnless(d <= d2)
+ self.failUnless(d < d2)
+ self.failUnlessEqual(d["a"], 1)
+ self.failUnlessEqual(sorted(list([k for k in d])), ["a","b"])
+ def eq(a, b):
+ return a == b
+ self.failUnlessRaises(TypeError, eq, d, "not a dict")
+
+ d3 = d.copy()
+ self.failUnlessEqual(d, d3)
+ self.failUnless(isinstance(d3, dictutil.NumDict))
+
+ d4 = d.fromkeys(["a","b"], 5)
+ self.failUnlessEqual(d4, {"a": 5, "b": 5})
+
+ self.failUnlessEqual(d.get("a"), 1)
+ self.failUnlessEqual(d.get("c"), 0)
+ self.failUnlessEqual(d.get("c", 5), 5)
+ self.failUnlessEqual(sorted(list(d.items())),
+ [("a", 1), ("b", 2)])
+ self.failUnlessEqual(sorted(list(d.iteritems())),
+ [("a", 1), ("b", 2)])
+ self.failUnlessEqual(sorted(d.keys()), ["a", "b"])
+ self.failUnlessEqual(sorted(d.values()), [1, 2])
+ self.failUnless(d.has_key("a"))
+ self.failIf(d.has_key("c"))
+
+ x = d.setdefault("c", 3)
+ self.failUnlessEqual(x, 3)
+ self.failUnlessEqual(d["c"], 3)
+ x = d.setdefault("c", 5)
+ self.failUnlessEqual(x, 3)
+ self.failUnlessEqual(d["c"], 3)
+ del d["c"]
+
+ x = d.popitem()
+ self.failUnless(x in [("a", 1), ("b", 2)])
+ x = d.popitem()
+ self.failUnless(x in [("a", 1), ("b", 2)])
+ self.failUnlessRaises(KeyError, d.popitem)
+
+ d.update({"c": 3})
+ d.update({"c": 4, "d": 5})
+ self.failUnlessEqual(d, {"c": 4, "d": 5})
+
+ def test_del_if_present(self):
+ d = {1: "a", 2: "b"}
+ dictutil.del_if_present(d, 1)
+ dictutil.del_if_present(d, 3)
+ self.failUnlessEqual(d, {2: "b"})
+
+ def test_valueordereddict(self):
+ d = dictutil.ValueOrderedDict()
+ d["a"] = 3
+ d["b"] = 2
+ d["c"] = 1
+
+ self.failUnlessEqual(d, {"a": 3, "b": 2, "c": 1})
+ self.failUnlessEqual(d.items(), [("c", 1), ("b", 2), ("a", 3)])
+ self.failUnlessEqual(d.values(), [1, 2, 3])
+ self.failUnlessEqual(d.keys(), ["c", "b", "a"])
+ self.failUnlessEqual(repr(d), "<ValueOrderedDict {c: 1, b: 2, a: 3}>")
+ def eq(a, b):
+ return a == b
+ self.failIf(d == {"a": 4})
+ self.failUnless(d != {"a": 4})
+
+ x = d.setdefault("d", 0)
+ self.failUnlessEqual(x, 0)
+ self.failUnlessEqual(d["d"], 0)
+ x = d.setdefault("d", -1)
+ self.failUnlessEqual(x, 0)
+ self.failUnlessEqual(d["d"], 0)
+
+ x = d.remove("e", "default", False)
+ self.failUnlessEqual(x, "default")
+ self.failUnlessRaises(KeyError, d.remove, "e", "default", True)
+ x = d.remove("d", 5)
+ self.failUnlessEqual(x, 0)
+
+ x = d.__getitem__("c")
+ self.failUnlessEqual(x, 1)
+ x = d.__getitem__("e", "default", False)
+ self.failUnlessEqual(x, "default")
+ self.failUnlessRaises(KeyError, d.__getitem__, "e", "default", True)
+
+ self.failUnlessEqual(d.popitem(), ("c", 1))
+ self.failUnlessEqual(d.popitem(), ("b", 2))
+ self.failUnlessEqual(d.popitem(), ("a", 3))
+ self.failUnlessRaises(KeyError, d.popitem)
+
+ d = dictutil.ValueOrderedDict({"a": 3, "b": 2, "c": 1})
+ x = d.pop("d", "default", False)
+ self.failUnlessEqual(x, "default")
+ self.failUnlessRaises(KeyError, d.pop, "d", "default", True)
+ x = d.pop("b")
+ self.failUnlessEqual(x, 2)
+ self.failUnlessEqual(d.items(), [("c", 1), ("a", 3)])
+
+ d = dictutil.ValueOrderedDict({"a": 3, "b": 2, "c": 1})
+ x = d.pop_from_list(1) # pop the second item, b/2
+ self.failUnlessEqual(x, "b")
+ self.failUnlessEqual(d.items(), [("c", 1), ("a", 3)])
+
+ def test_auxdict(self):
+ d = dictutil.AuxValueDict()
+ # we put the serialized form in the auxdata
+ d.set_with_aux("key", ("filecap", "metadata"), "serialized")
+
+ self.failUnlessEqual(d.keys(), ["key"])
+ self.failUnlessEqual(d["key"], ("filecap", "metadata"))
+ self.failUnlessEqual(d.get_aux("key"), "serialized")
+ def _get_missing(key):
+ return d[key]
+ self.failUnlessRaises(KeyError, _get_missing, "nonkey")
+ self.failUnlessEqual(d.get("nonkey"), None)
+ self.failUnlessEqual(d.get("nonkey", "nonvalue"), "nonvalue")
+ self.failUnlessEqual(d.get_aux("nonkey"), None)
+ self.failUnlessEqual(d.get_aux("nonkey", "nonvalue"), "nonvalue")
+
+ d["key"] = ("filecap2", "metadata2")
+ self.failUnlessEqual(d["key"], ("filecap2", "metadata2"))
+ self.failUnlessEqual(d.get_aux("key"), None)
+
+ d.set_with_aux("key2", "value2", "aux2")
+ self.failUnlessEqual(sorted(d.keys()), ["key", "key2"])
+ del d["key2"]
+ self.failUnlessEqual(d.keys(), ["key"])
+ self.failIf("key2" in d)
+ self.failUnlessRaises(KeyError, _get_missing, "key2")
+ self.failUnlessEqual(d.get("key2"), None)
+ self.failUnlessEqual(d.get_aux("key2"), None)
+ d["key2"] = "newvalue2"
+ self.failUnlessEqual(d.get("key2"), "newvalue2")
+ self.failUnlessEqual(d.get_aux("key2"), None)
+
+ d = dictutil.AuxValueDict({1:2,3:4})
+ self.failUnlessEqual(sorted(d.keys()), [1,3])
+ self.failUnlessEqual(d[1], 2)
+ self.failUnlessEqual(d.get_aux(1), None)
+
+ d = dictutil.AuxValueDict([ (1,2), (3,4) ])
+ self.failUnlessEqual(sorted(d.keys()), [1,3])
+ self.failUnlessEqual(d[1], 2)
+ self.failUnlessEqual(d.get_aux(1), None)
+
+ d = dictutil.AuxValueDict(one=1, two=2)
+ self.failUnlessEqual(sorted(d.keys()), ["one","two"])
+ self.failUnlessEqual(d["one"], 1)
+ self.failUnlessEqual(d.get_aux("one"), None)
+
+class Pipeline(unittest.TestCase):
+ def pause(self, *args, **kwargs):
+ d = defer.Deferred()
+ self.calls.append( (d, args, kwargs) )
+ return d
+
+ def failUnlessCallsAre(self, expected):
+ #print self.calls
+ #print expected
+ self.failUnlessEqual(len(self.calls), len(expected), self.calls)
+ for i,c in enumerate(self.calls):
+ self.failUnlessEqual(c[1:], expected[i], str(i))
+
+ def test_basic(self):
+ self.calls = []
+ finished = []
+ p = pipeline.Pipeline(100)
+
+ d = p.flush() # fires immediately
+ d.addCallbacks(finished.append, log.err)
+ self.failUnlessEqual(len(finished), 1)
+ finished = []
+
+ d = p.add(10, self.pause, "one")
+ # the call should start right away, and our return Deferred should
+ # fire right away
+ d.addCallbacks(finished.append, log.err)
+ self.failUnlessEqual(len(finished), 1)
+ self.failUnlessEqual(finished[0], None)
+ self.failUnlessCallsAre([ ( ("one",) , {} ) ])
+ self.failUnlessEqual(p.gauge, 10)
+
+ # pipeline: [one]
+
+ finished = []
+ d = p.add(20, self.pause, "two", kw=2)
+ # pipeline: [one, two]
+
+ # the call and the Deferred should fire right away
+ d.addCallbacks(finished.append, log.err)
+ self.failUnlessEqual(len(finished), 1)
+ self.failUnlessEqual(finished[0], None)
+ self.failUnlessCallsAre([ ( ("one",) , {} ),
+ ( ("two",) , {"kw": 2} ),
+ ])
+ self.failUnlessEqual(p.gauge, 30)
+
+ self.calls[0][0].callback("one-result")
+ # pipeline: [two]
+ self.failUnlessEqual(p.gauge, 20)
+
+ finished = []
+ d = p.add(90, self.pause, "three", "posarg1")
+ # pipeline: [two, three]
+ flushed = []
+ fd = p.flush()
+ fd.addCallbacks(flushed.append, log.err)
+ self.failUnlessEqual(flushed, [])
+
+ # the call will be made right away, but the return Deferred will not,
+ # because the pipeline is now full.
+ d.addCallbacks(finished.append, log.err)
+ self.failUnlessEqual(len(finished), 0)
+ self.failUnlessCallsAre([ ( ("one",) , {} ),
+ ( ("two",) , {"kw": 2} ),
+ ( ("three", "posarg1"), {} ),
+ ])
+ self.failUnlessEqual(p.gauge, 110)
+
+ self.failUnlessRaises(pipeline.SingleFileError, p.add, 10, self.pause)
+
+ # retiring either call will unblock the pipeline, causing the #3
+ # Deferred to fire
+ self.calls[2][0].callback("three-result")
+ # pipeline: [two]
+
+ self.failUnlessEqual(len(finished), 1)
+ self.failUnlessEqual(finished[0], None)
+ self.failUnlessEqual(flushed, [])
+
+ # retiring call#2 will finally allow the flush() Deferred to fire
+ self.calls[1][0].callback("two-result")
+ self.failUnlessEqual(len(flushed), 1)
+
+ def test_errors(self):
+ self.calls = []
+ p = pipeline.Pipeline(100)
+
+ d1 = p.add(200, self.pause, "one")
+ d2 = p.flush()
+
+ finished = []
+ d1.addBoth(finished.append)
+ self.failUnlessEqual(finished, [])
+
+ flushed = []
+ d2.addBoth(flushed.append)
+ self.failUnlessEqual(flushed, [])
+
+ self.calls[0][0].errback(ValueError("oops"))
+
+ self.failUnlessEqual(len(finished), 1)
+ f = finished[0]
+ self.failUnless(isinstance(f, Failure))
+ self.failUnless(f.check(pipeline.PipelineError))
+ self.failUnlessIn("PipelineError", str(f.value))
+ self.failUnlessIn("ValueError", str(f.value))
+ r = repr(f.value)
+ self.failUnless("ValueError" in r, r)
+ f2 = f.value.error
+ self.failUnless(f2.check(ValueError))
+
+ self.failUnlessEqual(len(flushed), 1)
+ f = flushed[0]
+ self.failUnless(isinstance(f, Failure))
+ self.failUnless(f.check(pipeline.PipelineError))
+ f2 = f.value.error
+ self.failUnless(f2.check(ValueError))
+
+ # now that the pipeline is in the failed state, any new calls will
+ # fail immediately
+
+ d3 = p.add(20, self.pause, "two")
+
+ finished = []
+ d3.addBoth(finished.append)
+ self.failUnlessEqual(len(finished), 1)
+ f = finished[0]
+ self.failUnless(isinstance(f, Failure))
+ self.failUnless(f.check(pipeline.PipelineError))
+ r = repr(f.value)
+ self.failUnless("ValueError" in r, r)
+ f2 = f.value.error
+ self.failUnless(f2.check(ValueError))
+
+ d4 = p.flush()
+ flushed = []
+ d4.addBoth(flushed.append)
+ self.failUnlessEqual(len(flushed), 1)
+ f = flushed[0]
+ self.failUnless(isinstance(f, Failure))
+ self.failUnless(f.check(pipeline.PipelineError))
+ f2 = f.value.error
+ self.failUnless(f2.check(ValueError))
+
+ def test_errors2(self):
+ self.calls = []
+ p = pipeline.Pipeline(100)
+
+ d1 = p.add(10, self.pause, "one")
+ d2 = p.add(20, self.pause, "two")
+ d3 = p.add(30, self.pause, "three")
+ d4 = p.flush()
+
+ # one call fails, then the second one succeeds: make sure
+ # ExpandableDeferredList tolerates the second one
+
+ flushed = []
+ d4.addBoth(flushed.append)
+ self.failUnlessEqual(flushed, [])
+
+ self.calls[0][0].errback(ValueError("oops"))
+ self.failUnlessEqual(len(flushed), 1)
+ f = flushed[0]
+ self.failUnless(isinstance(f, Failure))
+ self.failUnless(f.check(pipeline.PipelineError))
+ f2 = f.value.error
+ self.failUnless(f2.check(ValueError))
+
+ self.calls[1][0].callback("two-result")
+ self.calls[2][0].errback(ValueError("three-error"))
+
+ del d1,d2,d3,d4
+
+class SampleError(Exception):
+ pass
+
+class Log(unittest.TestCase):
+ def test_err(self):
+ if not hasattr(self, "flushLoggedErrors"):
+ # without flushLoggedErrors, we can't get rid of the
+ # twisted.log.err that tahoe_log records, so we can't keep this
+ # test from [ERROR]ing
+ raise unittest.SkipTest("needs flushLoggedErrors from Twisted-2.5.0")
+ try:
+ raise SampleError("simple sample")
+ except:
+ f = Failure()
+ tahoe_log.err(format="intentional sample error",
+ failure=f, level=tahoe_log.OPERATIONAL, umid="wO9UoQ")
+ self.flushLoggedErrors(SampleError)
+
+
+class SimpleSpans:
+ # this is a simple+inefficient form of util.spans.Spans . We compare the
+ # behavior of this reference model against the real (efficient) form.
+
+ def __init__(self, _span_or_start=None, length=None):
+ self._have = set()
+ if length is not None:
+ for i in range(_span_or_start, _span_or_start+length):
+ self._have.add(i)
+ elif _span_or_start:
+ for (start,length) in _span_or_start:
+ self.add(start, length)
+
+ def add(self, start, length):
+ for i in range(start, start+length):
+ self._have.add(i)
+ return self
+
+ def remove(self, start, length):
+ for i in range(start, start+length):
+ self._have.discard(i)
+ return self
+
+ def each(self):
+ return sorted(self._have)
+
+ def __iter__(self):
+ items = sorted(self._have)
+ prevstart = None
+ prevend = None
+ for i in items:
+ if prevstart is None:
+ prevstart = prevend = i
+ continue
+ if i == prevend+1:
+ prevend = i
+ continue
+ yield (prevstart, prevend-prevstart+1)
+ prevstart = prevend = i
+ if prevstart is not None:
+ yield (prevstart, prevend-prevstart+1)
+
+ def __nonzero__(self): # this gets us bool()
+ return self.len()
+
+ def len(self):
+ return len(self._have)
+
+ def __add__(self, other):
+ s = self.__class__(self)
+ for (start, length) in other:
+ s.add(start, length)
+ return s
+
+ def __sub__(self, other):
+ s = self.__class__(self)
+ for (start, length) in other:
+ s.remove(start, length)
+ return s
+
+ def __iadd__(self, other):
+ for (start, length) in other:
+ self.add(start, length)
+ return self
+
+ def __isub__(self, other):
+ for (start, length) in other:
+ self.remove(start, length)
+ return self
+
+ def __and__(self, other):
+ s = self.__class__()
+ for i in other.each():
+ if i in self._have:
+ s.add(i, 1)
+ return s
+
+ def __contains__(self, (start,length)):
+ for i in range(start, start+length):
+ if i not in self._have:
+ return False
+ return True
+
+class ByteSpans(unittest.TestCase):
+ def test_basic(self):
+ s = Spans()
+ self.failUnlessEqual(list(s), [])
+ self.failIf(s)
+ self.failIf((0,1) in s)
+ self.failUnlessEqual(s.len(), 0)
+
+ s1 = Spans(3, 4) # 3,4,5,6
+ self._check1(s1)
+
+ s1 = Spans(3L, 4L) # 3,4,5,6
+ self._check1(s1)
+
+ s2 = Spans(s1)
+ self._check1(s2)
+
+ s2.add(10,2) # 10,11
+ self._check1(s1)
+ self.failUnless((10,1) in s2)
+ self.failIf((10,1) in s1)
+ self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11])
+ self.failUnlessEqual(s2.len(), 6)
+
+ s2.add(15,2).add(20,2)
+ self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11,15,16,20,21])
+ self.failUnlessEqual(s2.len(), 10)
+
+ s2.remove(4,3).remove(15,1)
+ self.failUnlessEqual(list(s2.each()), [3,10,11,16,20,21])
+ self.failUnlessEqual(s2.len(), 6)
+
+ s1 = SimpleSpans(3, 4) # 3 4 5 6
+ s2 = SimpleSpans(5, 4) # 5 6 7 8
+ i = s1 & s2
+ self.failUnlessEqual(list(i.each()), [5, 6])
+
+ def _check1(self, s):
+ self.failUnlessEqual(list(s), [(3,4)])
+ self.failUnless(s)
+ self.failUnlessEqual(s.len(), 4)
+ self.failIf((0,1) in s)
+ self.failUnless((3,4) in s)
+ self.failUnless((3,1) in s)
+ self.failUnless((5,2) in s)
+ self.failUnless((6,1) in s)
+ self.failIf((6,2) in s)
+ self.failIf((7,1) in s)
+ self.failUnlessEqual(list(s.each()), [3,4,5,6])
+
+ def test_large(self):
+ s = Spans(4, 2**65) # don't do this with a SimpleSpans
+ self.failUnlessEqual(list(s), [(4, 2**65)])
+ self.failUnless(s)
+ self.failUnlessEqual(s.len(), 2**65)
+ self.failIf((0,1) in s)
+ self.failUnless((4,2) in s)
+ self.failUnless((2**65,2) in s)
+
+ def test_math(self):
+ s1 = Spans(0, 10) # 0,1,2,3,4,5,6,7,8,9
+ s2 = Spans(5, 3) # 5,6,7
+ s3 = Spans(8, 4) # 8,9,10,11
+
+ s = s1 - s2
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9])
+ s = s1 - s3
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7])
+ s = s2 - s3
+ self.failUnlessEqual(list(s.each()), [5,6,7])
+ s = s1 & s2
+ self.failUnlessEqual(list(s.each()), [5,6,7])
+ s = s2 & s1
+ self.failUnlessEqual(list(s.each()), [5,6,7])
+ s = s1 & s3
+ self.failUnlessEqual(list(s.each()), [8,9])
+ s = s3 & s1
+ self.failUnlessEqual(list(s.each()), [8,9])
+ s = s2 & s3
+ self.failUnlessEqual(list(s.each()), [])
+ s = s3 & s2
+ self.failUnlessEqual(list(s.each()), [])
+ s = Spans() & s3
+ self.failUnlessEqual(list(s.each()), [])
+ s = s3 & Spans()
+ self.failUnlessEqual(list(s.each()), [])
+
+ s = s1 + s2
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9])
+ s = s1 + s3
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11])
+ s = s2 + s3
+ self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11])
+
+ s = Spans(s1)
+ s -= s2
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9])
+ s = Spans(s1)
+ s -= s3
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7])
+ s = Spans(s2)
+ s -= s3
+ self.failUnlessEqual(list(s.each()), [5,6,7])
+
+ s = Spans(s1)
+ s += s2
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9])
+ s = Spans(s1)
+ s += s3
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11])
+ s = Spans(s2)
+ s += s3
+ self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11])
+
+ def test_random(self):
+ # attempt to increase coverage of corner cases by comparing behavior
+ # of a simple-but-slow model implementation against the
+ # complex-but-fast actual implementation, in a large number of random
+ # operations
+ S1 = SimpleSpans
+ S2 = Spans
+ s1 = S1(); s2 = S2()
+ seed = ""
+ def _create(subseed):
+ ns1 = S1(); ns2 = S2()
+ for i in range(10):
+ what = _hash(subseed+str(i)).hexdigest()
+ start = int(what[2:4], 16)
+ length = max(1,int(what[5:6], 16))
+ ns1.add(start, length); ns2.add(start, length)
+ return ns1, ns2
+
+ #print
+ for i in range(1000):
+ what = _hash(seed+str(i)).hexdigest()
+ op = what[0]
+ subop = what[1]
+ start = int(what[2:4], 16)
+ length = max(1,int(what[5:6], 16))
+ #print what
+ if op in "0":
+ if subop in "01234":
+ s1 = S1(); s2 = S2()
+ elif subop in "5678":
+ s1 = S1(start, length); s2 = S2(start, length)
+ else:
+ s1 = S1(s1); s2 = S2(s2)
+ #print "s2 = %s" % s2.dump()
+ elif op in "123":
+ #print "s2.add(%d,%d)" % (start, length)
+ s1.add(start, length); s2.add(start, length)
+ elif op in "456":
+ #print "s2.remove(%d,%d)" % (start, length)
+ s1.remove(start, length); s2.remove(start, length)
+ elif op in "78":
+ ns1, ns2 = _create(what[7:11])
+ #print "s2 + %s" % ns2.dump()
+ s1 = s1 + ns1; s2 = s2 + ns2
+ elif op in "9a":
+ ns1, ns2 = _create(what[7:11])
+ #print "%s - %s" % (s2.dump(), ns2.dump())
+ s1 = s1 - ns1; s2 = s2 - ns2
+ elif op in "bc":
+ ns1, ns2 = _create(what[7:11])
+ #print "s2 += %s" % ns2.dump()
+ s1 += ns1; s2 += ns2
+ elif op in "de":
+ ns1, ns2 = _create(what[7:11])
+ #print "%s -= %s" % (s2.dump(), ns2.dump())
+ s1 -= ns1; s2 -= ns2
+ else:
+ ns1, ns2 = _create(what[7:11])
+ #print "%s &= %s" % (s2.dump(), ns2.dump())
+ s1 = s1 & ns1; s2 = s2 & ns2
+ #print "s2 now %s" % s2.dump()
+ self.failUnlessEqual(list(s1.each()), list(s2.each()))
+ self.failUnlessEqual(s1.len(), s2.len())
+ self.failUnlessEqual(bool(s1), bool(s2))
+ self.failUnlessEqual(list(s1), list(s2))
+ for j in range(10):
+ what = _hash(what[12:14]+str(j)).hexdigest()
+ start = int(what[2:4], 16)
+ length = max(1, int(what[5:6], 16))
+ span = (start, length)
+ self.failUnlessEqual(bool(span in s1), bool(span in s2))
+
+
+ # s()
+ # s(start,length)
+ # s(s0)
+ # s.add(start,length) : returns s
+ # s.remove(start,length)
+ # s.each() -> list of byte offsets, mostly for testing
+ # list(s) -> list of (start,length) tuples, one per span
+ # (start,length) in s -> True if (start..start+length-1) are all members
+ # NOT equivalent to x in list(s)
+ # s.len() -> number of bytes, for testing, bool(), and accounting/limiting
+ # bool(s) (__nonzeron__)
+ # s = s1+s2, s1-s2, +=s1, -=s1
+
+ def test_overlap(self):
+ for a in range(20):
+ for b in range(10):
+ for c in range(20):
+ for d in range(10):
+ self._test_overlap(a,b,c,d)
+
+ def _test_overlap(self, a, b, c, d):
+ s1 = set(range(a,a+b))
+ s2 = set(range(c,c+d))
+ #print "---"
+ #self._show_overlap(s1, "1")
+ #self._show_overlap(s2, "2")
+ o = overlap(a,b,c,d)
+ expected = s1.intersection(s2)
+ if not expected:
+ self.failUnlessEqual(o, None)
+ else:
+ start,length = o
+ so = set(range(start,start+length))
+ #self._show(so, "o")
+ self.failUnlessEqual(so, expected)
+
+ def _show_overlap(self, s, c):
+ import sys
+ out = sys.stdout
+ if s:
+ for i in range(max(s)):
+ if i in s:
+ out.write(c)
+ else:
+ out.write(" ")
+ out.write("\n")
+
+def extend(s, start, length, fill):
+ if len(s) >= start+length:
+ return s
+ assert len(fill) == 1
+ return s + fill*(start+length-len(s))
+
+def replace(s, start, data):
+ assert len(s) >= start+len(data)
+ return s[:start] + data + s[start+len(data):]
+
+class SimpleDataSpans:
+ def __init__(self, other=None):
+ self.missing = "" # "1" where missing, "0" where found
+ self.data = ""
+ if other:
+ for (start, data) in other.get_chunks():
+ self.add(start, data)
+
+ def __nonzero__(self): # this gets us bool()
+ return self.len()
+ def len(self):
+ return len(self.missing.replace("1", ""))
+ def _dump(self):
+ return [i for (i,c) in enumerate(self.missing) if c == "0"]
+ def _have(self, start, length):
+ m = self.missing[start:start+length]
+ if not m or len(m)<length or int(m):
+ return False
+ return True
+ def get_chunks(self):
+ for i in self._dump():
+ yield (i, self.data[i])
+ def get_spans(self):
+ return SimpleSpans([(start,len(data))
+ for (start,data) in self.get_chunks()])
+ def get(self, start, length):
+ if self._have(start, length):
+ return self.data[start:start+length]
+ return None
+ def pop(self, start, length):
+ data = self.get(start, length)
+ if data:
+ self.remove(start, length)
+ return data
+ def remove(self, start, length):
+ self.missing = replace(extend(self.missing, start, length, "1"),
+ start, "1"*length)
+ def add(self, start, data):
+ self.missing = replace(extend(self.missing, start, len(data), "1"),
+ start, "0"*len(data))
+ self.data = replace(extend(self.data, start, len(data), " "),
+ start, data)
+
+
+class StringSpans(unittest.TestCase):
+ def do_basic(self, klass):
+ ds = klass()
+ self.failUnlessEqual(ds.len(), 0)
+ self.failUnlessEqual(list(ds._dump()), [])
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 0)
+ s = ds.get_spans()
+ self.failUnlessEqual(ds.get(0, 4), None)
+ self.failUnlessEqual(ds.pop(0, 4), None)
+ ds.remove(0, 4)
+
+ ds.add(2, "four")
+ self.failUnlessEqual(ds.len(), 4)
+ self.failUnlessEqual(list(ds._dump()), [2,3,4,5])
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 4)
+ s = ds.get_spans()
+ self.failUnless((2,2) in s)
+ self.failUnlessEqual(ds.get(0, 4), None)
+ self.failUnlessEqual(ds.pop(0, 4), None)
+ self.failUnlessEqual(ds.get(4, 4), None)
+
+ ds2 = klass(ds)
+ self.failUnlessEqual(ds2.len(), 4)
+ self.failUnlessEqual(list(ds2._dump()), [2,3,4,5])
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_chunks()]), 4)
+ self.failUnlessEqual(ds2.get(0, 4), None)
+ self.failUnlessEqual(ds2.pop(0, 4), None)
+ self.failUnlessEqual(ds2.pop(2, 3), "fou")
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_chunks()]), 1)
+ self.failUnlessEqual(ds2.get(2, 3), None)
+ self.failUnlessEqual(ds2.get(5, 1), "r")
+ self.failUnlessEqual(ds.get(2, 3), "fou")
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 4)
+
+ ds.add(0, "23")
+ self.failUnlessEqual(ds.len(), 6)
+ self.failUnlessEqual(list(ds._dump()), [0,1,2,3,4,5])
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 6)
+ self.failUnlessEqual(ds.get(0, 4), "23fo")
+ self.failUnlessEqual(ds.pop(0, 4), "23fo")
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 2)
+ self.failUnlessEqual(ds.get(0, 4), None)
+ self.failUnlessEqual(ds.pop(0, 4), None)
+
+ ds = klass()
+ ds.add(2, "four")
+ ds.add(3, "ea")
+ self.failUnlessEqual(ds.get(2, 4), "fear")
+
+ ds = klass()
+ ds.add(2L, "four")
+ ds.add(3L, "ea")
+ self.failUnlessEqual(ds.get(2L, 4L), "fear")
+
+
+ def do_scan(self, klass):
+ # do a test with gaps and spans of size 1 and 2
+ # left=(1,11) * right=(1,11) * gapsize=(1,2)
+ # 111, 112, 121, 122, 211, 212, 221, 222
+ # 211
+ # 121
+ # 112
+ # 212
+ # 222
+ # 221
+ # 111
+ # 122
+ # 11 1 1 11 11 11 1 1 111
+ # 0123456789012345678901234567
+ # abcdefghijklmnopqrstuvwxyz-=
+ pieces = [(1, "bc"),
+ (4, "e"),
+ (7, "h"),
+ (9, "jk"),
+ (12, "mn"),
+ (16, "qr"),
+ (20, "u"),
+ (22, "w"),
+ (25, "z-="),
+ ]
+ p_elements = set([1,2,4,7,9,10,12,13,16,17,20,22,25,26,27])
+ S = "abcdefghijklmnopqrstuvwxyz-="
+ # TODO: when adding data, add capital letters, to make sure we aren't
+ # just leaving the old data in place
+ l = len(S)
+ def base():
+ ds = klass()
+ for start, data in pieces:
+ ds.add(start, data)
+ return ds
+ def dump(s):
+ p = set(s._dump())
+ d = "".join([((i not in p) and " " or S[i]) for i in range(l)])
+ assert len(d) == l
+ return d
+ DEBUG = False
+ for start in range(0, l):
+ for end in range(start+1, l):
+ # add [start-end) to the baseline
+ which = "%d-%d" % (start, end-1)
+ p_added = set(range(start, end))
+ b = base()
+ if DEBUG:
+ print
+ print dump(b), which
+ add = klass(); add.add(start, S[start:end])
+ print dump(add)
+ b.add(start, S[start:end])
+ if DEBUG:
+ print dump(b)
+ # check that the new span is there
+ d = b.get(start, end-start)
+ self.failUnlessEqual(d, S[start:end], which)
+ # check that all the original pieces are still there
+ for t_start, t_data in pieces:
+ t_len = len(t_data)
+ self.failUnlessEqual(b.get(t_start, t_len),
+ S[t_start:t_start+t_len],
+ "%s %d+%d" % (which, t_start, t_len))
+ # check that a lot of subspans are mostly correct
+ for t_start in range(l):
+ for t_len in range(1,4):
+ d = b.get(t_start, t_len)
+ if d is not None:
+ which2 = "%s+(%d-%d)" % (which, t_start,
+ t_start+t_len-1)
+ self.failUnlessEqual(d, S[t_start:t_start+t_len],
+ which2)
+ # check that removing a subspan gives the right value
+ b2 = klass(b)
+ b2.remove(t_start, t_len)
+ removed = set(range(t_start, t_start+t_len))
+ for i in range(l):
+ exp = (((i in p_elements) or (i in p_added))
+ and (i not in removed))
+ which2 = "%s-(%d-%d)" % (which, t_start,
+ t_start+t_len-1)
+ self.failUnlessEqual(bool(b2.get(i, 1)), exp,
+ which2+" %d" % i)
+
+ def test_test(self):
+ self.do_basic(SimpleDataSpans)
+ self.do_scan(SimpleDataSpans)
+
+ def test_basic(self):
+ self.do_basic(DataSpans)
+ self.do_scan(DataSpans)
+
+ def test_random(self):
+ # attempt to increase coverage of corner cases by comparing behavior
+ # of a simple-but-slow model implementation against the
+ # complex-but-fast actual implementation, in a large number of random
+ # operations
+ S1 = SimpleDataSpans
+ S2 = DataSpans
+ s1 = S1(); s2 = S2()
+ seed = ""
+ def _randstr(length, seed):
+ created = 0
+ pieces = []
+ while created < length:
+ piece = _hash(seed + str(created)).hexdigest()
+ pieces.append(piece)
+ created += len(piece)
+ return "".join(pieces)[:length]
+ def _create(subseed):
+ ns1 = S1(); ns2 = S2()
+ for i in range(10):
+ what = _hash(subseed+str(i)).hexdigest()
+ start = int(what[2:4], 16)
+ length = max(1,int(what[5:6], 16))
+ ns1.add(start, _randstr(length, what[7:9]));
+ ns2.add(start, _randstr(length, what[7:9]))
+ return ns1, ns2
+ #print
+ for i in range(1000):
+ what = _hash(seed+str(i)).hexdigest()
+ op = what[0]
+ subop = what[1]
+ start = int(what[2:4], 16)
+ length = max(1,int(what[5:6], 16))
+ #print what
+ if op in "0":
+ if subop in "0123456":
+ s1 = S1(); s2 = S2()
+ else:
+ s1, s2 = _create(what[7:11])
+ #print "s2 = %s" % list(s2._dump())
+ elif op in "123456":
+ #print "s2.add(%d,%d)" % (start, length)
+ s1.add(start, _randstr(length, what[7:9]));
+ s2.add(start, _randstr(length, what[7:9]))
+ elif op in "789abc":
+ #print "s2.remove(%d,%d)" % (start, length)
+ s1.remove(start, length); s2.remove(start, length)
+ else:
+ #print "s2.pop(%d,%d)" % (start, length)
+ d1 = s1.pop(start, length); d2 = s2.pop(start, length)
+ self.failUnlessEqual(d1, d2)
+ #print "s1 now %s" % list(s1._dump())
+ #print "s2 now %s" % list(s2._dump())
+ self.failUnlessEqual(s1.len(), s2.len())
+ self.failUnlessEqual(list(s1._dump()), list(s2._dump()))
+ for j in range(100):
+ what = _hash(what[12:14]+str(j)).hexdigest()
+ start = int(what[2:4], 16)
+ length = max(1, int(what[5:6], 16))
+ d1 = s1.get(start, length); d2 = s2.get(start, length)
+ self.failUnlessEqual(d1, d2, "%d+%d" % (start, length))