X-Git-Url: https://git.rkrishnan.org/?a=blobdiff_plain;f=src%2Fallmydata%2Ftest%2Ftest_util.py;h=e59e14aba6e92c4625896e24c6a20f015d38932e;hb=186f6d4a59a879ee93207a48bca15fbdf2741206;hp=f66d55f637d2bc898e8c8ef367b1f89b7132e92c;hpb=731d15e56f509f9a1aafc56182d85f30d0fcb801;p=tahoe-lafs%2Ftahoe-lafs.git diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py index f66d55f6..e59e14ab 100644 --- a/src/allmydata/test/test_util.py +++ b/src/allmydata/test/test_util.py @@ -1,18 +1,22 @@ def foo(): pass # keep the line number constant -import os, time +import os, time, sys from StringIO import StringIO from twisted.trial import unittest from twisted.internet import defer, reactor from twisted.python.failure import Failure from twisted.python import log +from pycryptopp.hash.sha256 import SHA256 as _hash from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil from allmydata.util import assertutil, fileutil, deferredutil, abbreviate from allmydata.util import limiter, time_format, pollmixin, cachedir from allmydata.util import statistics, dictutil, pipeline from allmydata.util import log as tahoe_log +from allmydata.util.spans import Spans, overlap, DataSpans +from allmydata.test.common_util import ReallyEqualMixin, TimezoneMixin + class Base32(unittest.TestCase): def test_b2a_matches_Pythons(self): @@ -368,7 +372,7 @@ class Asserts(unittest.TestCase): m = self.should_assert(f, False, othermsg="message2") self.failUnlessEqual("postcondition: othermsg: 'message2' ", m) -class FileUtil(unittest.TestCase): +class FileUtil(ReallyEqualMixin, unittest.TestCase): def mkdir(self, basedir, path, mode=0777): fn = os.path.join(basedir, path) fileutil.make_dirs(fn, mode) @@ -418,33 +422,14 @@ class FileUtil(unittest.TestCase): fileutil.rm_dir(basedir) fileutil.remove_if_possible(fn) # should survive errors - def test_open_or_create(self): - basedir = "util/FileUtil/test_open_or_create" + def test_write_atomically(self): + basedir = "util/FileUtil/test_write_atomically" fileutil.make_dirs(basedir) fn = os.path.join(basedir, "here") - f = fileutil.open_or_create(fn) - f.write("stuff.") - f.close() - f = fileutil.open_or_create(fn) - f.seek(0, 2) - f.write("more.") - f.close() - f = open(fn, "r") - data = f.read() - f.close() - self.failUnlessEqual(data, "stuff.more.") - - def test_NamedTemporaryDirectory(self): - basedir = "util/FileUtil/test_NamedTemporaryDirectory" - fileutil.make_dirs(basedir) - td = fileutil.NamedTemporaryDirectory(dir=basedir) - name = td.name - self.failUnless(basedir in name) - self.failUnless(basedir in repr(td)) - self.failUnless(os.path.isdir(name)) - del td - # it is conceivable that we need to force gc here, but I'm not sure - self.failIf(os.path.isdir(name)) + fileutil.write_atomically(fn, "one") + self.failUnlessEqual(fileutil.read(fn), "one") + fileutil.write_atomically(fn, "two", mode="") # non-binary + self.failUnlessEqual(fileutil.read(fn), "two") def test_rename(self): basedir = "util/FileUtil/test_rename" @@ -470,6 +455,118 @@ class FileUtil(unittest.TestCase): used = fileutil.du(basedir) self.failUnlessEqual(10+11+12+13, used) + def test_abspath_expanduser_unicode(self): + self.failUnlessRaises(AssertionError, fileutil.abspath_expanduser_unicode, "bytestring") + + saved_cwd = os.path.normpath(os.getcwdu()) + abspath_cwd = fileutil.abspath_expanduser_unicode(u".") + self.failUnless(isinstance(saved_cwd, unicode), saved_cwd) + self.failUnless(isinstance(abspath_cwd, unicode), abspath_cwd) + if sys.platform == "win32": + self.failUnlessReallyEqual(abspath_cwd, fileutil.to_windows_long_path(saved_cwd)) + else: + self.failUnlessReallyEqual(abspath_cwd, saved_cwd) + + self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\?\\foo"), u"\\\\?\\foo") + self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\.\\foo"), u"\\\\.\\foo") + self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\server\\foo"), u"\\\\?\\UNC\\server\\foo") + self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo"), u"\\\\?\\C:\\foo") + self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo/bar"), u"\\\\?\\C:\\foo\\bar") + + # adapted from + + foo = fileutil.abspath_expanduser_unicode(u"foo") + self.failUnless(foo.endswith(u"%sfoo" % (os.path.sep,)), foo) + + foobar = fileutil.abspath_expanduser_unicode(u"bar", base=foo) + self.failUnless(foobar.endswith(u"%sfoo%sbar" % (os.path.sep, os.path.sep)), foobar) + + if sys.platform == "win32": + # This is checking that a drive letter is added for a path without one. + baz = fileutil.abspath_expanduser_unicode(u"\\baz") + self.failUnless(baz.startswith(u"\\\\?\\"), baz) + self.failUnlessReallyEqual(baz[5 :], u":\\baz") + + bar = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz) + self.failUnless(bar.startswith(u"\\\\?\\"), bar) + self.failUnlessReallyEqual(bar[5 :], u":\\bar") + # not u":\\baz\\bar", because \bar is absolute on the current drive. + + self.failUnlessReallyEqual(baz[4], bar[4]) # same drive + + self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~")) + + cwds = ['cwd'] + try: + cwds.append(u'\xe7w\xf0'.encode(sys.getfilesystemencoding() + or 'ascii')) + except UnicodeEncodeError: + pass # the cwd can't be encoded -- test with ascii cwd only + + for cwd in cwds: + try: + os.mkdir(cwd) + os.chdir(cwd) + for upath in (u'', u'fuu', u'f\xf9\xf9', u'/fuu', u'U:\\', u'~'): + uabspath = fileutil.abspath_expanduser_unicode(upath) + self.failUnless(isinstance(uabspath, unicode), uabspath) + finally: + os.chdir(saved_cwd) + + def test_create_long_path(self): + workdir = u"test_create_long_path" + fileutil.make_dirs(workdir) + long_path = fileutil.abspath_expanduser_unicode(os.path.join(workdir, u'x'*255)) + def _cleanup(): + fileutil.remove(long_path) + self.addCleanup(_cleanup) + + fileutil.write(long_path, "test") + self.failUnless(os.path.exists(long_path)) + self.failUnlessEqual(fileutil.read(long_path), "test") + _cleanup() + self.failIf(os.path.exists(long_path)) + + def _test_windows_expanduser(self, userprofile=None, homedrive=None, homepath=None): + def call_windows_getenv(name): + if name == u"USERPROFILE": return userprofile + if name == u"HOMEDRIVE": return homedrive + if name == u"HOMEPATH": return homepath + self.fail("unexpected argument to call_windows_getenv") + self.patch(fileutil, 'windows_getenv', call_windows_getenv) + + self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100")) + self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~\\foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo")) + self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~/foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo")) + self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a"), u"a") + self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a~"), u"a~") + self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a\\~\\foo"), u"a\\~\\foo") + + def test_windows_expanduser_xp(self): + return self._test_windows_expanduser(homedrive=u"C:", homepath=u"\\Documents and Settings\\\u0100") + + def test_windows_expanduser_win7(self): + return self._test_windows_expanduser(userprofile=os.path.join(u"C:", u"\\Documents and Settings\\\u0100")) + + def test_disk_stats(self): + avail = fileutil.get_available_space('.', 2**14) + if avail == 0: + raise unittest.SkipTest("This test will spuriously fail there is no disk space left.") + + disk = fileutil.get_disk_stats('.', 2**13) + self.failUnless(disk['total'] > 0, disk['total']) + # we tolerate used==0 for a Travis-CI bug, see #2290 + self.failUnless(disk['used'] >= 0, disk['used']) + self.failUnless(disk['free_for_root'] > 0, disk['free_for_root']) + self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot']) + self.failUnless(disk['avail'] > 0, disk['avail']) + + def test_disk_stats_avail_nonnegative(self): + # This test will spuriously fail if you have more than 2^128 + # bytes of available space on your filesystem. + disk = fileutil.get_disk_stats('.', 2**128) + self.failUnlessEqual(disk['avail'], 0) + class PollMixinTests(unittest.TestCase): def setUp(self): self.pm = pollmixin.PollMixin() @@ -497,7 +594,7 @@ class PollMixinTests(unittest.TestCase): d.addCallbacks(_suc, _err) return d -class DeferredUtilTests(unittest.TestCase): +class DeferredUtilTests(unittest.TestCase, deferredutil.WaitForDelayedCallsMixin): def test_gather_results(self): d1 = defer.Deferred() d2 = defer.Deferred() @@ -537,6 +634,21 @@ class DeferredUtilTests(unittest.TestCase): self.failUnless(isinstance(f, Failure)) self.failUnless(f.check(ValueError)) + def test_wait_for_delayed_calls(self): + """ + This tests that 'wait_for_delayed_calls' does in fact wait for a + delayed call that is active when the test returns. If it didn't, + Trial would report an unclean reactor error for this test. + """ + def _trigger(): + #print "trigger" + pass + reactor.callLater(0.1, _trigger) + + d = defer.succeed(None) + d.addBoth(self.wait_for_delayed_calls) + return d + class HashUtilTests(unittest.TestCase): def test_random_key(self): @@ -599,11 +711,11 @@ class HashUtilTests(unittest.TestCase): h2.update("foo") self.failUnlessEqual(h1, h2.digest()) - def test_constant_time_compare(self): - self.failUnless(hashutil.constant_time_compare("a", "a")) - self.failUnless(hashutil.constant_time_compare("ab", "ab")) - self.failIf(hashutil.constant_time_compare("a", "b")) - self.failIf(hashutil.constant_time_compare("a", "aa")) + def test_timing_safe_compare(self): + self.failUnless(hashutil.timing_safe_compare("a", "a")) + self.failUnless(hashutil.timing_safe_compare("ab", "ab")) + self.failIf(hashutil.timing_safe_compare("a", "b")) + self.failIf(hashutil.timing_safe_compare("a", "aa")) def _testknown(self, hashf, expected_a, *args): got = hashf(*args) @@ -673,7 +785,8 @@ class Abbreviate(unittest.TestCase): (1000*1000*1000, "1.00 GB"), (1000*1000*1000*1000, "1.00 TB"), (1000*1000*1000*1000*1000, "1.00 PB"), - (1234567890123456, "1.23 PB"), + (1000*1000*1000*1000*1000*1000, "1.00 EB"), + (1234567890123456789, "1.23 EB"), ] for (x, expected) in tests_si: got = abbreviate.abbreviate_space(x, SI=True) @@ -693,7 +806,8 @@ class Abbreviate(unittest.TestCase): (1024*1024*1024*1024, "1.00 TiB"), (1000*1000*1000*1000*1000, "909.49 TiB"), (1024*1024*1024*1024*1024, "1.00 PiB"), - (1234567890123456, "1.10 PiB"), + (1024*1024*1024*1024*1024*1024, "1.00 EiB"), + (1234567890123456789, "1.07 EiB"), ] for (x, expected) in tests_base1024: got = abbreviate.abbreviate_space(x, SI=False) @@ -715,8 +829,19 @@ class Abbreviate(unittest.TestCase): self.failUnlessEqual(p("10MiB"), 10*1024*1024) self.failUnlessEqual(p("5G"), 5*1000*1000*1000) self.failUnlessEqual(p("4GiB"), 4*1024*1024*1024) + self.failUnlessEqual(p("3TB"), 3*1000*1000*1000*1000) + self.failUnlessEqual(p("3TiB"), 3*1024*1024*1024*1024) + self.failUnlessEqual(p("6PB"), 6*1000*1000*1000*1000*1000) + self.failUnlessEqual(p("6PiB"), 6*1024*1024*1024*1024*1024) + self.failUnlessEqual(p("9EB"), 9*1000*1000*1000*1000*1000*1000) + self.failUnlessEqual(p("9EiB"), 9*1024*1024*1024*1024*1024*1024) + e = self.failUnlessRaises(ValueError, p, "12 cubits") - self.failUnless("12 cubits" in str(e)) + self.failUnlessIn("12 cubits", str(e)) + e = self.failUnlessRaises(ValueError, p, "1 BB") + self.failUnlessIn("1 BB", str(e)) + e = self.failUnlessRaises(ValueError, p, "fhtagn") + self.failUnlessIn("fhtagn", str(e)) class Limiter(unittest.TestCase): timeout = 480 # This takes longer than 240 seconds on Francois's arm box. @@ -793,7 +918,7 @@ class Limiter(unittest.TestCase): d.addCallback(_all_done) return d -class TimeFormat(unittest.TestCase): +class TimeFormat(unittest.TestCase, TimezoneMixin): def test_epoch(self): return self._help_test_epoch() @@ -807,19 +932,12 @@ class TimeFormat(unittest.TestCase): # time_format.iso_utc_time_to_localseconds() breaks if the timezone is # Europe/London. (As soon as this unit test is done then I'll change # that implementation to something that works even in this case...) - origtz = os.environ.get('TZ') - os.environ['TZ'] = "Europe/London" - if hasattr(time, 'tzset'): - time.tzset() - try: - return self._help_test_epoch() - finally: - if origtz is None: - del os.environ['TZ'] - else: - os.environ['TZ'] = origtz - if hasattr(time, 'tzset'): - time.tzset() + + if not self.have_working_tzset(): + raise unittest.SkipTest("This test can't be run on a platform without time.tzset().") + + self.setTimezone("Europe/London") + return self._help_test_epoch() def _help_test_epoch(self): origtzname = time.tzname @@ -855,6 +973,82 @@ class TimeFormat(unittest.TestCase): self.failUnlessEqual(thatmomentinmarch, 1237585742.226536) self.failUnlessEqual(origtzname, time.tzname) + def test_iso_utc(self): + when = 1266760143.7841301 + out = time_format.iso_utc_date(when) + self.failUnlessEqual(out, "2010-02-21") + out = time_format.iso_utc_date(t=lambda: when) + self.failUnlessEqual(out, "2010-02-21") + out = time_format.iso_utc(when) + self.failUnlessEqual(out, "2010-02-21_13:49:03.784130") + out = time_format.iso_utc(when, sep="-") + self.failUnlessEqual(out, "2010-02-21-13:49:03.784130") + + def test_parse_duration(self): + p = time_format.parse_duration + DAY = 24*60*60 + self.failUnlessEqual(p("1 day"), DAY) + self.failUnlessEqual(p("2 days"), 2*DAY) + self.failUnlessEqual(p("3 months"), 3*31*DAY) + self.failUnlessEqual(p("4 mo"), 4*31*DAY) + self.failUnlessEqual(p("5 years"), 5*365*DAY) + e = self.failUnlessRaises(ValueError, p, "123") + self.failUnlessIn("no unit (like day, month, or year) in '123'", + str(e)) + + def test_parse_date(self): + self.failUnlessEqual(time_format.parse_date("2010-02-21"), 1266710400) + + def test_format_time(self): + self.failUnlessEqual(time_format.format_time(time.gmtime(0)), '1970-01-01 00:00:00') + self.failUnlessEqual(time_format.format_time(time.gmtime(60)), '1970-01-01 00:01:00') + self.failUnlessEqual(time_format.format_time(time.gmtime(60*60)), '1970-01-01 01:00:00') + seconds_per_day = 60*60*24 + leap_years_1970_to_2014_inclusive = ((2012 - 1968) // 4) + self.failUnlessEqual(time_format.format_time(time.gmtime(seconds_per_day*((2015 - 1970)*365+leap_years_1970_to_2014_inclusive))), '2015-01-01 00:00:00') + + def test_format_time_y2038(self): + seconds_per_day = 60*60*24 + leap_years_1970_to_2047_inclusive = ((2044 - 1968) // 4) + try: + self.failUnlessEqual(time_format.format_time(time.gmtime(seconds_per_day*((2048 - 1970)*365+leap_years_1970_to_2047_inclusive))), '2048-01-01 00:00:00') + except unittest.FailTest: + raise unittest.SkipTest("Note: this system cannot handle dates after 2037.") + + def test_format_delta(self): + time_1 = 1389812723 + time_5s_delta = 1389812728 + time_28m7s_delta = 1389814410 + time_1h_delta = 1389816323 + time_1d21h46m49s_delta = 1389977532 + + self.failUnlessEqual( + time_format.format_delta(time_1, time_1), '0s') + + self.failUnlessEqual( + time_format.format_delta(time_1, time_5s_delta), '5s') + self.failUnlessEqual( + time_format.format_delta(time_1, time_28m7s_delta), '28m 7s') + self.failUnlessEqual( + time_format.format_delta(time_1, time_1h_delta), '1h 0m 0s') + self.failUnlessEqual( + time_format.format_delta(time_1, time_1d21h46m49s_delta), '1d 21h 46m 49s') + + self.failUnlessEqual( + time_format.format_delta(time_1d21h46m49s_delta, time_1), '-') + + # time_1 with a decimal fraction will make the delta 1s less + time_1decimal = 1389812723.383963 + + self.failUnlessEqual( + time_format.format_delta(time_1decimal, time_5s_delta), '4s') + self.failUnlessEqual( + time_format.format_delta(time_1decimal, time_28m7s_delta), '28m 6s') + self.failUnlessEqual( + time_format.format_delta(time_1decimal, time_1h_delta), '59m 59s') + self.failUnlessEqual( + time_format.format_delta(time_1decimal, time_1d21h46m49s_delta), '1d 21h 46m 48s') + class CacheDir(unittest.TestCase): def test_basic(self): basedir = "test_util/CacheDir/test_basic" @@ -917,6 +1111,7 @@ class CacheDir(unittest.TestCase): _failIfExists("a") _failUnlessExists("b") _failUnlessExists("c") + del b2 ctr = [0] class EqButNotIs: @@ -1032,16 +1227,13 @@ class DictUtil(unittest.TestCase): ds.discard(2, "c") self.failIf(2 in ds) - ds.union(1, ["a", "e"]) - ds.union(3, ["f"]) - self.failUnlessEqual(ds[1], set(["a","e"])) - self.failUnlessEqual(ds[3], set(["f"])) + ds.add(3, "f") ds2 = dictutil.DictOfSets() ds2.add(3, "f") ds2.add(3, "g") ds2.add(4, "h") ds.update(ds2) - self.failUnlessEqual(ds[1], set(["a","e"])) + self.failUnlessEqual(ds[1], set(["a"])) self.failUnlessEqual(ds[3], set(["f", "g"])) self.failUnlessEqual(ds[4], set(["h"])) @@ -1491,6 +1683,8 @@ class Pipeline(unittest.TestCase): self.calls[1][0].callback("two-result") self.calls[2][0].errback(ValueError("three-error")) + del d1,d2,d3,d4 + class SampleError(Exception): pass @@ -1508,3 +1702,586 @@ class Log(unittest.TestCase): tahoe_log.err(format="intentional sample error", failure=f, level=tahoe_log.OPERATIONAL, umid="wO9UoQ") self.flushLoggedErrors(SampleError) + + +class SimpleSpans: + # this is a simple+inefficient form of util.spans.Spans . We compare the + # behavior of this reference model against the real (efficient) form. + + def __init__(self, _span_or_start=None, length=None): + self._have = set() + if length is not None: + for i in range(_span_or_start, _span_or_start+length): + self._have.add(i) + elif _span_or_start: + for (start,length) in _span_or_start: + self.add(start, length) + + def add(self, start, length): + for i in range(start, start+length): + self._have.add(i) + return self + + def remove(self, start, length): + for i in range(start, start+length): + self._have.discard(i) + return self + + def each(self): + return sorted(self._have) + + def __iter__(self): + items = sorted(self._have) + prevstart = None + prevend = None + for i in items: + if prevstart is None: + prevstart = prevend = i + continue + if i == prevend+1: + prevend = i + continue + yield (prevstart, prevend-prevstart+1) + prevstart = prevend = i + if prevstart is not None: + yield (prevstart, prevend-prevstart+1) + + def __nonzero__(self): # this gets us bool() + return self.len() + + def len(self): + return len(self._have) + + def __add__(self, other): + s = self.__class__(self) + for (start, length) in other: + s.add(start, length) + return s + + def __sub__(self, other): + s = self.__class__(self) + for (start, length) in other: + s.remove(start, length) + return s + + def __iadd__(self, other): + for (start, length) in other: + self.add(start, length) + return self + + def __isub__(self, other): + for (start, length) in other: + self.remove(start, length) + return self + + def __and__(self, other): + s = self.__class__() + for i in other.each(): + if i in self._have: + s.add(i, 1) + return s + + def __contains__(self, (start,length)): + for i in range(start, start+length): + if i not in self._have: + return False + return True + +class ByteSpans(unittest.TestCase): + def test_basic(self): + s = Spans() + self.failUnlessEqual(list(s), []) + self.failIf(s) + self.failIf((0,1) in s) + self.failUnlessEqual(s.len(), 0) + + s1 = Spans(3, 4) # 3,4,5,6 + self._check1(s1) + + s1 = Spans(3L, 4L) # 3,4,5,6 + self._check1(s1) + + s2 = Spans(s1) + self._check1(s2) + + s2.add(10,2) # 10,11 + self._check1(s1) + self.failUnless((10,1) in s2) + self.failIf((10,1) in s1) + self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11]) + self.failUnlessEqual(s2.len(), 6) + + s2.add(15,2).add(20,2) + self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11,15,16,20,21]) + self.failUnlessEqual(s2.len(), 10) + + s2.remove(4,3).remove(15,1) + self.failUnlessEqual(list(s2.each()), [3,10,11,16,20,21]) + self.failUnlessEqual(s2.len(), 6) + + s1 = SimpleSpans(3, 4) # 3 4 5 6 + s2 = SimpleSpans(5, 4) # 5 6 7 8 + i = s1 & s2 + self.failUnlessEqual(list(i.each()), [5, 6]) + + def _check1(self, s): + self.failUnlessEqual(list(s), [(3,4)]) + self.failUnless(s) + self.failUnlessEqual(s.len(), 4) + self.failIf((0,1) in s) + self.failUnless((3,4) in s) + self.failUnless((3,1) in s) + self.failUnless((5,2) in s) + self.failUnless((6,1) in s) + self.failIf((6,2) in s) + self.failIf((7,1) in s) + self.failUnlessEqual(list(s.each()), [3,4,5,6]) + + def test_large(self): + s = Spans(4, 2**65) # don't do this with a SimpleSpans + self.failUnlessEqual(list(s), [(4, 2**65)]) + self.failUnless(s) + self.failUnlessEqual(s.len(), 2**65) + self.failIf((0,1) in s) + self.failUnless((4,2) in s) + self.failUnless((2**65,2) in s) + + def test_math(self): + s1 = Spans(0, 10) # 0,1,2,3,4,5,6,7,8,9 + s2 = Spans(5, 3) # 5,6,7 + s3 = Spans(8, 4) # 8,9,10,11 + + s = s1 - s2 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9]) + s = s1 - s3 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7]) + s = s2 - s3 + self.failUnlessEqual(list(s.each()), [5,6,7]) + s = s1 & s2 + self.failUnlessEqual(list(s.each()), [5,6,7]) + s = s2 & s1 + self.failUnlessEqual(list(s.each()), [5,6,7]) + s = s1 & s3 + self.failUnlessEqual(list(s.each()), [8,9]) + s = s3 & s1 + self.failUnlessEqual(list(s.each()), [8,9]) + s = s2 & s3 + self.failUnlessEqual(list(s.each()), []) + s = s3 & s2 + self.failUnlessEqual(list(s.each()), []) + s = Spans() & s3 + self.failUnlessEqual(list(s.each()), []) + s = s3 & Spans() + self.failUnlessEqual(list(s.each()), []) + + s = s1 + s2 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9]) + s = s1 + s3 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11]) + s = s2 + s3 + self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11]) + + s = Spans(s1) + s -= s2 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9]) + s = Spans(s1) + s -= s3 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7]) + s = Spans(s2) + s -= s3 + self.failUnlessEqual(list(s.each()), [5,6,7]) + + s = Spans(s1) + s += s2 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9]) + s = Spans(s1) + s += s3 + self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11]) + s = Spans(s2) + s += s3 + self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11]) + + def test_random(self): + # attempt to increase coverage of corner cases by comparing behavior + # of a simple-but-slow model implementation against the + # complex-but-fast actual implementation, in a large number of random + # operations + S1 = SimpleSpans + S2 = Spans + s1 = S1(); s2 = S2() + seed = "" + def _create(subseed): + ns1 = S1(); ns2 = S2() + for i in range(10): + what = _hash(subseed+str(i)).hexdigest() + start = int(what[2:4], 16) + length = max(1,int(what[5:6], 16)) + ns1.add(start, length); ns2.add(start, length) + return ns1, ns2 + + #print + for i in range(1000): + what = _hash(seed+str(i)).hexdigest() + op = what[0] + subop = what[1] + start = int(what[2:4], 16) + length = max(1,int(what[5:6], 16)) + #print what + if op in "0": + if subop in "01234": + s1 = S1(); s2 = S2() + elif subop in "5678": + s1 = S1(start, length); s2 = S2(start, length) + else: + s1 = S1(s1); s2 = S2(s2) + #print "s2 = %s" % s2.dump() + elif op in "123": + #print "s2.add(%d,%d)" % (start, length) + s1.add(start, length); s2.add(start, length) + elif op in "456": + #print "s2.remove(%d,%d)" % (start, length) + s1.remove(start, length); s2.remove(start, length) + elif op in "78": + ns1, ns2 = _create(what[7:11]) + #print "s2 + %s" % ns2.dump() + s1 = s1 + ns1; s2 = s2 + ns2 + elif op in "9a": + ns1, ns2 = _create(what[7:11]) + #print "%s - %s" % (s2.dump(), ns2.dump()) + s1 = s1 - ns1; s2 = s2 - ns2 + elif op in "bc": + ns1, ns2 = _create(what[7:11]) + #print "s2 += %s" % ns2.dump() + s1 += ns1; s2 += ns2 + elif op in "de": + ns1, ns2 = _create(what[7:11]) + #print "%s -= %s" % (s2.dump(), ns2.dump()) + s1 -= ns1; s2 -= ns2 + else: + ns1, ns2 = _create(what[7:11]) + #print "%s &= %s" % (s2.dump(), ns2.dump()) + s1 = s1 & ns1; s2 = s2 & ns2 + #print "s2 now %s" % s2.dump() + self.failUnlessEqual(list(s1.each()), list(s2.each())) + self.failUnlessEqual(s1.len(), s2.len()) + self.failUnlessEqual(bool(s1), bool(s2)) + self.failUnlessEqual(list(s1), list(s2)) + for j in range(10): + what = _hash(what[12:14]+str(j)).hexdigest() + start = int(what[2:4], 16) + length = max(1, int(what[5:6], 16)) + span = (start, length) + self.failUnlessEqual(bool(span in s1), bool(span in s2)) + + + # s() + # s(start,length) + # s(s0) + # s.add(start,length) : returns s + # s.remove(start,length) + # s.each() -> list of byte offsets, mostly for testing + # list(s) -> list of (start,length) tuples, one per span + # (start,length) in s -> True if (start..start+length-1) are all members + # NOT equivalent to x in list(s) + # s.len() -> number of bytes, for testing, bool(), and accounting/limiting + # bool(s) (__nonzeron__) + # s = s1+s2, s1-s2, +=s1, -=s1 + + def test_overlap(self): + for a in range(20): + for b in range(10): + for c in range(20): + for d in range(10): + self._test_overlap(a,b,c,d) + + def _test_overlap(self, a, b, c, d): + s1 = set(range(a,a+b)) + s2 = set(range(c,c+d)) + #print "---" + #self._show_overlap(s1, "1") + #self._show_overlap(s2, "2") + o = overlap(a,b,c,d) + expected = s1.intersection(s2) + if not expected: + self.failUnlessEqual(o, None) + else: + start,length = o + so = set(range(start,start+length)) + #self._show(so, "o") + self.failUnlessEqual(so, expected) + + def _show_overlap(self, s, c): + import sys + out = sys.stdout + if s: + for i in range(max(s)): + if i in s: + out.write(c) + else: + out.write(" ") + out.write("\n") + +def extend(s, start, length, fill): + if len(s) >= start+length: + return s + assert len(fill) == 1 + return s + fill*(start+length-len(s)) + +def replace(s, start, data): + assert len(s) >= start+len(data) + return s[:start] + data + s[start+len(data):] + +class SimpleDataSpans: + def __init__(self, other=None): + self.missing = "" # "1" where missing, "0" where found + self.data = "" + if other: + for (start, data) in other.get_chunks(): + self.add(start, data) + + def __nonzero__(self): # this gets us bool() + return self.len() + def len(self): + return len(self.missing.replace("1", "")) + def _dump(self): + return [i for (i,c) in enumerate(self.missing) if c == "0"] + def _have(self, start, length): + m = self.missing[start:start+length] + if not m or len(m)