X-Git-Url: https://git.rkrishnan.org/?a=blobdiff_plain;f=src%2Fallmydata%2Ftest%2Ftest_util.py;h=25b656156ee1cec51752e0f29b5a0ef98c7d1eac;hb=498563da69dddae86478343c66d437a467274522;hp=48d4711dc3f16bbc0a0cc8e69839f861be88ee0a;hpb=cbcb728e7ea0031d25738297f906a7f247fa2f75;p=tahoe-lafs%2Ftahoe-lafs.git diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py index 48d4711d..25b65615 100644 --- a/src/allmydata/test/test_util.py +++ b/src/allmydata/test/test_util.py @@ -7,7 +7,7 @@ from twisted.trial import unittest from twisted.internet import defer, reactor from twisted.python.failure import Failure from twisted.python import log -from hashlib import md5 +from pycryptopp.hash.sha256 import SHA256 as _hash from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil from allmydata.util import assertutil, fileutil, deferredutil, abbreviate @@ -15,6 +15,8 @@ from allmydata.util import limiter, time_format, pollmixin, cachedir from allmydata.util import statistics, dictutil, pipeline from allmydata.util import log as tahoe_log from allmydata.util.spans import Spans, overlap, DataSpans +from allmydata.test.common_util import ReallyEqualMixin, TimezoneMixin + class Base32(unittest.TestCase): def test_b2a_matches_Pythons(self): @@ -370,7 +372,7 @@ class Asserts(unittest.TestCase): m = self.should_assert(f, False, othermsg="message2") self.failUnlessEqual("postcondition: othermsg: 'message2' ", m) -class FileUtil(unittest.TestCase): +class FileUtil(ReallyEqualMixin, unittest.TestCase): def mkdir(self, basedir, path, mode=0777): fn = os.path.join(basedir, path) fileutil.make_dirs(fn, mode) @@ -420,33 +422,14 @@ class FileUtil(unittest.TestCase): fileutil.rm_dir(basedir) fileutil.remove_if_possible(fn) # should survive errors - def test_open_or_create(self): - basedir = "util/FileUtil/test_open_or_create" + def test_write_atomically(self): + basedir = "util/FileUtil/test_write_atomically" fileutil.make_dirs(basedir) fn = os.path.join(basedir, "here") - f = fileutil.open_or_create(fn) - f.write("stuff.") - f.close() - f = fileutil.open_or_create(fn) - f.seek(0, 2) - f.write("more.") - f.close() - f = open(fn, "r") - data = f.read() - f.close() - self.failUnlessEqual(data, "stuff.more.") - - def test_NamedTemporaryDirectory(self): - basedir = "util/FileUtil/test_NamedTemporaryDirectory" - fileutil.make_dirs(basedir) - td = fileutil.NamedTemporaryDirectory(dir=basedir) - name = td.name - self.failUnless(basedir in name) - self.failUnless(basedir in repr(td)) - self.failUnless(os.path.isdir(name)) - del td - # it is conceivable that we need to force gc here, but I'm not sure - self.failIf(os.path.isdir(name)) + fileutil.write_atomically(fn, "one") + self.failUnlessEqual(fileutil.read(fn), "one") + fileutil.write_atomically(fn, "two", mode="") # non-binary + self.failUnlessEqual(fileutil.read(fn), "two") def test_rename(self): basedir = "util/FileUtil/test_rename" @@ -479,11 +462,38 @@ class FileUtil(unittest.TestCase): abspath_cwd = fileutil.abspath_expanduser_unicode(u".") self.failUnless(isinstance(saved_cwd, unicode), saved_cwd) self.failUnless(isinstance(abspath_cwd, unicode), abspath_cwd) - self.failUnlessEqual(abspath_cwd, saved_cwd) + if sys.platform == "win32": + self.failUnlessReallyEqual(abspath_cwd, fileutil.to_windows_long_path(saved_cwd)) + else: + self.failUnlessReallyEqual(abspath_cwd, saved_cwd) + + self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\?\\foo"), u"\\\\?\\foo") + self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\.\\foo"), u"\\\\.\\foo") + self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\server\\foo"), u"\\\\?\\UNC\\server\\foo") + self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo"), u"\\\\?\\C:\\foo") + self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo/bar"), u"\\\\?\\C:\\foo\\bar") # adapted from - self.failUnlessIn(u"foo", fileutil.abspath_expanduser_unicode(u"foo")) + foo = fileutil.abspath_expanduser_unicode(u"foo") + self.failUnless(foo.endswith(u"%sfoo" % (os.path.sep,)), foo) + + foobar = fileutil.abspath_expanduser_unicode(u"bar", base=foo) + self.failUnless(foobar.endswith(u"%sfoo%sbar" % (os.path.sep, os.path.sep)), foobar) + + if sys.platform == "win32": + # This is checking that a drive letter is added for a path without one. + baz = fileutil.abspath_expanduser_unicode(u"\\baz") + self.failUnless(baz.startswith(u"\\\\?\\"), baz) + self.failUnlessReallyEqual(baz[5 :], u":\\baz") + + bar = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz) + self.failUnless(bar.startswith(u"\\\\?\\"), bar) + self.failUnlessReallyEqual(bar[5 :], u":\\bar") + # not u":\\baz\\bar", because \bar is absolute on the current drive. + + self.failUnlessReallyEqual(baz[4], bar[4]) # same drive + self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~")) cwds = ['cwd'] @@ -503,6 +513,60 @@ class FileUtil(unittest.TestCase): finally: os.chdir(saved_cwd) + def test_create_long_path(self): + workdir = u"test_create_long_path" + fileutil.make_dirs(workdir) + long_path = fileutil.abspath_expanduser_unicode(os.path.join(workdir, u'x'*255)) + def _cleanup(): + fileutil.remove(long_path) + self.addCleanup(_cleanup) + + fileutil.write(long_path, "test") + self.failUnless(os.path.exists(long_path)) + self.failUnlessEqual(fileutil.read(long_path), "test") + _cleanup() + self.failIf(os.path.exists(long_path)) + + def _test_windows_expanduser(self, userprofile=None, homedrive=None, homepath=None): + def call_windows_getenv(name): + if name == u"USERPROFILE": return userprofile + if name == u"HOMEDRIVE": return homedrive + if name == u"HOMEPATH": return homepath + self.fail("unexpected argument to call_windows_getenv") + self.patch(fileutil, 'windows_getenv', call_windows_getenv) + + self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100")) + self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~\\foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo")) + self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~/foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo")) + self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a"), u"a") + self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a~"), u"a~") + self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a\\~\\foo"), u"a\\~\\foo") + + def test_windows_expanduser_xp(self): + return self._test_windows_expanduser(homedrive=u"C:", homepath=u"\\Documents and Settings\\\u0100") + + def test_windows_expanduser_win7(self): + return self._test_windows_expanduser(userprofile=os.path.join(u"C:", u"\\Documents and Settings\\\u0100")) + + def test_disk_stats(self): + avail = fileutil.get_available_space('.', 2**14) + if avail == 0: + raise unittest.SkipTest("This test will spuriously fail there is no disk space left.") + + disk = fileutil.get_disk_stats('.', 2**13) + self.failUnless(disk['total'] > 0, disk['total']) + # we tolerate used==0 for a Travis-CI bug, see #2290 + self.failUnless(disk['used'] >= 0, disk['used']) + self.failUnless(disk['free_for_root'] > 0, disk['free_for_root']) + self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot']) + self.failUnless(disk['avail'] > 0, disk['avail']) + + def test_disk_stats_avail_nonnegative(self): + # This test will spuriously fail if you have more than 2^128 + # bytes of available space on your filesystem. + disk = fileutil.get_disk_stats('.', 2**128) + self.failUnlessEqual(disk['avail'], 0) + class PollMixinTests(unittest.TestCase): def setUp(self): self.pm = pollmixin.PollMixin() @@ -530,7 +594,7 @@ class PollMixinTests(unittest.TestCase): d.addCallbacks(_suc, _err) return d -class DeferredUtilTests(unittest.TestCase): +class DeferredUtilTests(unittest.TestCase, deferredutil.WaitForDelayedCallsMixin): def test_gather_results(self): d1 = defer.Deferred() d2 = defer.Deferred() @@ -570,6 +634,21 @@ class DeferredUtilTests(unittest.TestCase): self.failUnless(isinstance(f, Failure)) self.failUnless(f.check(ValueError)) + def test_wait_for_delayed_calls(self): + """ + This tests that 'wait_for_delayed_calls' does in fact wait for a + delayed call that is active when the test returns. If it didn't, + Trial would report an unclean reactor error for this test. + """ + def _trigger(): + #print "trigger" + pass + reactor.callLater(0.1, _trigger) + + d = defer.succeed(None) + d.addBoth(self.wait_for_delayed_calls) + return d + class HashUtilTests(unittest.TestCase): def test_random_key(self): @@ -632,11 +711,11 @@ class HashUtilTests(unittest.TestCase): h2.update("foo") self.failUnlessEqual(h1, h2.digest()) - def test_constant_time_compare(self): - self.failUnless(hashutil.constant_time_compare("a", "a")) - self.failUnless(hashutil.constant_time_compare("ab", "ab")) - self.failIf(hashutil.constant_time_compare("a", "b")) - self.failIf(hashutil.constant_time_compare("a", "aa")) + def test_timing_safe_compare(self): + self.failUnless(hashutil.timing_safe_compare("a", "a")) + self.failUnless(hashutil.timing_safe_compare("ab", "ab")) + self.failIf(hashutil.timing_safe_compare("a", "b")) + self.failIf(hashutil.timing_safe_compare("a", "aa")) def _testknown(self, hashf, expected_a, *args): got = hashf(*args) @@ -706,7 +785,8 @@ class Abbreviate(unittest.TestCase): (1000*1000*1000, "1.00 GB"), (1000*1000*1000*1000, "1.00 TB"), (1000*1000*1000*1000*1000, "1.00 PB"), - (1234567890123456, "1.23 PB"), + (1000*1000*1000*1000*1000*1000, "1.00 EB"), + (1234567890123456789, "1.23 EB"), ] for (x, expected) in tests_si: got = abbreviate.abbreviate_space(x, SI=True) @@ -726,7 +806,8 @@ class Abbreviate(unittest.TestCase): (1024*1024*1024*1024, "1.00 TiB"), (1000*1000*1000*1000*1000, "909.49 TiB"), (1024*1024*1024*1024*1024, "1.00 PiB"), - (1234567890123456, "1.10 PiB"), + (1024*1024*1024*1024*1024*1024, "1.00 EiB"), + (1234567890123456789, "1.07 EiB"), ] for (x, expected) in tests_base1024: got = abbreviate.abbreviate_space(x, SI=False) @@ -748,8 +829,19 @@ class Abbreviate(unittest.TestCase): self.failUnlessEqual(p("10MiB"), 10*1024*1024) self.failUnlessEqual(p("5G"), 5*1000*1000*1000) self.failUnlessEqual(p("4GiB"), 4*1024*1024*1024) + self.failUnlessEqual(p("3TB"), 3*1000*1000*1000*1000) + self.failUnlessEqual(p("3TiB"), 3*1024*1024*1024*1024) + self.failUnlessEqual(p("6PB"), 6*1000*1000*1000*1000*1000) + self.failUnlessEqual(p("6PiB"), 6*1024*1024*1024*1024*1024) + self.failUnlessEqual(p("9EB"), 9*1000*1000*1000*1000*1000*1000) + self.failUnlessEqual(p("9EiB"), 9*1024*1024*1024*1024*1024*1024) + e = self.failUnlessRaises(ValueError, p, "12 cubits") - self.failUnless("12 cubits" in str(e)) + self.failUnlessIn("12 cubits", str(e)) + e = self.failUnlessRaises(ValueError, p, "1 BB") + self.failUnlessIn("1 BB", str(e)) + e = self.failUnlessRaises(ValueError, p, "fhtagn") + self.failUnlessIn("fhtagn", str(e)) class Limiter(unittest.TestCase): timeout = 480 # This takes longer than 240 seconds on Francois's arm box. @@ -826,7 +918,7 @@ class Limiter(unittest.TestCase): d.addCallback(_all_done) return d -class TimeFormat(unittest.TestCase): +class TimeFormat(unittest.TestCase, TimezoneMixin): def test_epoch(self): return self._help_test_epoch() @@ -840,19 +932,8 @@ class TimeFormat(unittest.TestCase): # time_format.iso_utc_time_to_localseconds() breaks if the timezone is # Europe/London. (As soon as this unit test is done then I'll change # that implementation to something that works even in this case...) - origtz = os.environ.get('TZ') - os.environ['TZ'] = "Europe/London" - if hasattr(time, 'tzset'): - time.tzset() - try: - return self._help_test_epoch() - finally: - if origtz is None: - del os.environ['TZ'] - else: - os.environ['TZ'] = origtz - if hasattr(time, 'tzset'): - time.tzset() + self.setTimezone("Europe/London") + return self._help_test_epoch() def _help_test_epoch(self): origtzname = time.tzname @@ -914,6 +995,55 @@ class TimeFormat(unittest.TestCase): def test_parse_date(self): self.failUnlessEqual(time_format.parse_date("2010-02-21"), 1266710400) + def test_format_time(self): + self.failUnlessEqual(time_format.format_time(time.gmtime(0)), '1970-01-01 00:00:00') + self.failUnlessEqual(time_format.format_time(time.gmtime(60)), '1970-01-01 00:01:00') + self.failUnlessEqual(time_format.format_time(time.gmtime(60*60)), '1970-01-01 01:00:00') + seconds_per_day = 60*60*24 + leap_years_1970_to_2014_inclusive = ((2012 - 1968) // 4) + self.failUnlessEqual(time_format.format_time(time.gmtime(seconds_per_day*((2015 - 1970)*365+leap_years_1970_to_2014_inclusive))), '2015-01-01 00:00:00') + + def test_format_time_y2038(self): + seconds_per_day = 60*60*24 + leap_years_1970_to_2047_inclusive = ((2044 - 1968) // 4) + self.failUnlessEqual(time_format.format_time(time.gmtime(seconds_per_day*((2048 - 1970)*365+leap_years_1970_to_2047_inclusive))), '2048-01-01 00:00:00') + + test_format_time_y2038.todo = "This test is known to fail on systems with 32-bit time_t." + + def test_format_delta(self): + time_1 = 1389812723 + time_5s_delta = 1389812728 + time_28m7s_delta = 1389814410 + time_1h_delta = 1389816323 + time_1d21h46m49s_delta = 1389977532 + + self.failUnlessEqual( + time_format.format_delta(time_1, time_1), '0s') + + self.failUnlessEqual( + time_format.format_delta(time_1, time_5s_delta), '5s') + self.failUnlessEqual( + time_format.format_delta(time_1, time_28m7s_delta), '28m 7s') + self.failUnlessEqual( + time_format.format_delta(time_1, time_1h_delta), '1h 0m 0s') + self.failUnlessEqual( + time_format.format_delta(time_1, time_1d21h46m49s_delta), '1d 21h 46m 49s') + + self.failUnlessEqual( + time_format.format_delta(time_1d21h46m49s_delta, time_1), '-') + + # time_1 with a decimal fraction will make the delta 1s less + time_1decimal = 1389812723.383963 + + self.failUnlessEqual( + time_format.format_delta(time_1decimal, time_5s_delta), '4s') + self.failUnlessEqual( + time_format.format_delta(time_1decimal, time_28m7s_delta), '28m 6s') + self.failUnlessEqual( + time_format.format_delta(time_1decimal, time_1h_delta), '59m 59s') + self.failUnlessEqual( + time_format.format_delta(time_1decimal, time_1d21h46m49s_delta), '1d 21h 46m 48s') + class CacheDir(unittest.TestCase): def test_basic(self): basedir = "test_util/CacheDir/test_basic" @@ -1092,16 +1222,13 @@ class DictUtil(unittest.TestCase): ds.discard(2, "c") self.failIf(2 in ds) - ds.union(1, ["a", "e"]) - ds.union(3, ["f"]) - self.failUnlessEqual(ds[1], set(["a","e"])) - self.failUnlessEqual(ds[3], set(["f"])) + ds.add(3, "f") ds2 = dictutil.DictOfSets() ds2.add(3, "f") ds2.add(3, "g") ds2.add(4, "h") ds.update(ds2) - self.failUnlessEqual(ds[1], set(["a","e"])) + self.failUnlessEqual(ds[1], set(["a"])) self.failUnlessEqual(ds[3], set(["f", "g"])) self.failUnlessEqual(ds[4], set(["h"])) @@ -1614,8 +1741,10 @@ class SimpleSpans: if prevstart is not None: yield (prevstart, prevend-prevstart+1) - def __len__(self): - # this also gets us bool(s) + def __nonzero__(self): # this gets us bool() + return self.len() + + def len(self): return len(self._have) def __add__(self, other): @@ -1659,11 +1788,14 @@ class ByteSpans(unittest.TestCase): self.failUnlessEqual(list(s), []) self.failIf(s) self.failIf((0,1) in s) - self.failUnlessEqual(len(s), 0) + self.failUnlessEqual(s.len(), 0) s1 = Spans(3, 4) # 3,4,5,6 self._check1(s1) + s1 = Spans(3L, 4L) # 3,4,5,6 + self._check1(s1) + s2 = Spans(s1) self._check1(s2) @@ -1672,15 +1804,15 @@ class ByteSpans(unittest.TestCase): self.failUnless((10,1) in s2) self.failIf((10,1) in s1) self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11]) - self.failUnlessEqual(len(s2), 6) + self.failUnlessEqual(s2.len(), 6) s2.add(15,2).add(20,2) self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11,15,16,20,21]) - self.failUnlessEqual(len(s2), 10) + self.failUnlessEqual(s2.len(), 10) s2.remove(4,3).remove(15,1) self.failUnlessEqual(list(s2.each()), [3,10,11,16,20,21]) - self.failUnlessEqual(len(s2), 6) + self.failUnlessEqual(s2.len(), 6) s1 = SimpleSpans(3, 4) # 3 4 5 6 s2 = SimpleSpans(5, 4) # 5 6 7 8 @@ -1690,7 +1822,7 @@ class ByteSpans(unittest.TestCase): def _check1(self, s): self.failUnlessEqual(list(s), [(3,4)]) self.failUnless(s) - self.failUnlessEqual(len(s), 4) + self.failUnlessEqual(s.len(), 4) self.failIf((0,1) in s) self.failUnless((3,4) in s) self.failUnless((3,1) in s) @@ -1700,6 +1832,15 @@ class ByteSpans(unittest.TestCase): self.failIf((7,1) in s) self.failUnlessEqual(list(s.each()), [3,4,5,6]) + def test_large(self): + s = Spans(4, 2**65) # don't do this with a SimpleSpans + self.failUnlessEqual(list(s), [(4, 2**65)]) + self.failUnless(s) + self.failUnlessEqual(s.len(), 2**65) + self.failIf((0,1) in s) + self.failUnless((4,2) in s) + self.failUnless((2**65,2) in s) + def test_math(self): s1 = Spans(0, 10) # 0,1,2,3,4,5,6,7,8,9 s2 = Spans(5, 3) # 5,6,7 @@ -1767,7 +1908,7 @@ class ByteSpans(unittest.TestCase): def _create(subseed): ns1 = S1(); ns2 = S2() for i in range(10): - what = md5(subseed+str(i)).hexdigest() + what = _hash(subseed+str(i)).hexdigest() start = int(what[2:4], 16) length = max(1,int(what[5:6], 16)) ns1.add(start, length); ns2.add(start, length) @@ -1775,7 +1916,7 @@ class ByteSpans(unittest.TestCase): #print for i in range(1000): - what = md5(seed+str(i)).hexdigest() + what = _hash(seed+str(i)).hexdigest() op = what[0] subop = what[1] start = int(what[2:4], 16) @@ -1817,11 +1958,11 @@ class ByteSpans(unittest.TestCase): s1 = s1 & ns1; s2 = s2 & ns2 #print "s2 now %s" % s2.dump() self.failUnlessEqual(list(s1.each()), list(s2.each())) - self.failUnlessEqual(len(s1), len(s2)) + self.failUnlessEqual(s1.len(), s2.len()) self.failUnlessEqual(bool(s1), bool(s2)) self.failUnlessEqual(list(s1), list(s2)) for j in range(10): - what = md5(what[12:14]+str(j)).hexdigest() + what = _hash(what[12:14]+str(j)).hexdigest() start = int(what[2:4], 16) length = max(1, int(what[5:6], 16)) span = (start, length) @@ -1837,8 +1978,8 @@ class ByteSpans(unittest.TestCase): # list(s) -> list of (start,length) tuples, one per span # (start,length) in s -> True if (start..start+length-1) are all members # NOT equivalent to x in list(s) - # len(s) -> number of bytes, for testing, bool(), and accounting/limiting - # bool(s) (__len__) + # s.len() -> number of bytes, for testing, bool(), and accounting/limiting + # bool(s) (__nonzeron__) # s = s1+s2, s1-s2, +=s1, -=s1 def test_overlap(self): @@ -1893,8 +2034,10 @@ class SimpleDataSpans: for (start, data) in other.get_chunks(): self.add(start, data) - def __len__(self): - return len(self.missing.translate(None, "1")) + def __nonzero__(self): # this gets us bool() + return self.len() + def len(self): + return len(self.missing.replace("1", "")) def _dump(self): return [i for (i,c) in enumerate(self.missing) if c == "0"] def _have(self, start, length): @@ -1930,26 +2073,26 @@ class SimpleDataSpans: class StringSpans(unittest.TestCase): def do_basic(self, klass): ds = klass() - self.failUnlessEqual(len(ds), 0) + self.failUnlessEqual(ds.len(), 0) self.failUnlessEqual(list(ds._dump()), []) self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 0) - s = ds.get_spans() + s1 = ds.get_spans() self.failUnlessEqual(ds.get(0, 4), None) self.failUnlessEqual(ds.pop(0, 4), None) ds.remove(0, 4) ds.add(2, "four") - self.failUnlessEqual(len(ds), 4) + self.failUnlessEqual(ds.len(), 4) self.failUnlessEqual(list(ds._dump()), [2,3,4,5]) self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 4) - s = ds.get_spans() - self.failUnless((2,2) in s) + s1 = ds.get_spans() + self.failUnless((2,2) in s1) self.failUnlessEqual(ds.get(0, 4), None) self.failUnlessEqual(ds.pop(0, 4), None) self.failUnlessEqual(ds.get(4, 4), None) ds2 = klass(ds) - self.failUnlessEqual(len(ds2), 4) + self.failUnlessEqual(ds2.len(), 4) self.failUnlessEqual(list(ds2._dump()), [2,3,4,5]) self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_chunks()]), 4) self.failUnlessEqual(ds2.get(0, 4), None) @@ -1962,7 +2105,7 @@ class StringSpans(unittest.TestCase): self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 4) ds.add(0, "23") - self.failUnlessEqual(len(ds), 6) + self.failUnlessEqual(ds.len(), 6) self.failUnlessEqual(list(ds._dump()), [0,1,2,3,4,5]) self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 6) self.failUnlessEqual(ds.get(0, 4), "23fo") @@ -1976,6 +2119,12 @@ class StringSpans(unittest.TestCase): ds.add(3, "ea") self.failUnlessEqual(ds.get(2, 4), "fear") + ds = klass() + ds.add(2L, "four") + ds.add(3L, "ea") + self.failUnlessEqual(ds.get(2L, 4L), "fear") + + def do_scan(self, klass): # do a test with gaps and spans of size 1 and 2 # left=(1,11) * right=(1,11) * gapsize=(1,2) @@ -2013,9 +2162,7 @@ class StringSpans(unittest.TestCase): return ds def dump(s): p = set(s._dump()) - # wow, this is the first time I've ever wanted ?: in python - # note: this requires python2.5 - d = "".join([(S[i] if i in p else " ") for i in range(l)]) + d = "".join([((i not in p) and " " or S[i]) for i in range(l)]) assert len(d) == l return d DEBUG = False @@ -2084,14 +2231,14 @@ class StringSpans(unittest.TestCase): created = 0 pieces = [] while created < length: - piece = md5(seed + str(created)).hexdigest() + piece = _hash(seed + str(created)).hexdigest() pieces.append(piece) created += len(piece) return "".join(pieces)[:length] def _create(subseed): ns1 = S1(); ns2 = S2() for i in range(10): - what = md5(subseed+str(i)).hexdigest() + what = _hash(subseed+str(i)).hexdigest() start = int(what[2:4], 16) length = max(1,int(what[5:6], 16)) ns1.add(start, _randstr(length, what[7:9])); @@ -2100,7 +2247,7 @@ class StringSpans(unittest.TestCase): #print for i in range(1000): - what = md5(seed+str(i)).hexdigest() + what = _hash(seed+str(i)).hexdigest() op = what[0] subop = what[1] start = int(what[2:4], 16) @@ -2125,10 +2272,10 @@ class StringSpans(unittest.TestCase): self.failUnlessEqual(d1, d2) #print "s1 now %s" % list(s1._dump()) #print "s2 now %s" % list(s2._dump()) - self.failUnlessEqual(len(s1), len(s2)) + self.failUnlessEqual(s1.len(), s2.len()) self.failUnlessEqual(list(s1._dump()), list(s2._dump())) for j in range(100): - what = md5(what[12:14]+str(j)).hexdigest() + what = _hash(what[12:14]+str(j)).hexdigest() start = int(what[2:4], 16) length = max(1, int(what[5:6], 16)) d1 = s1.get(start, length); d2 = s2.get(start, length)