]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/test/test_util.py
Simplify an existing test by using TimezoneMixin.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_util.py
index 0f973363bcb356d59a14ad3327d791006db91496..25b656156ee1cec51752e0f29b5a0ef98c7d1eac 100644 (file)
@@ -7,7 +7,7 @@ from twisted.trial import unittest
 from twisted.internet import defer, reactor
 from twisted.python.failure import Failure
 from twisted.python import log
-from hashlib import md5
+from pycryptopp.hash.sha256 import SHA256 as _hash
 
 from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil
 from allmydata.util import assertutil, fileutil, deferredutil, abbreviate
@@ -15,6 +15,8 @@ from allmydata.util import limiter, time_format, pollmixin, cachedir
 from allmydata.util import statistics, dictutil, pipeline
 from allmydata.util import log as tahoe_log
 from allmydata.util.spans import Spans, overlap, DataSpans
+from allmydata.test.common_util import ReallyEqualMixin, TimezoneMixin
+
 
 class Base32(unittest.TestCase):
     def test_b2a_matches_Pythons(self):
@@ -370,7 +372,7 @@ class Asserts(unittest.TestCase):
         m = self.should_assert(f, False, othermsg="message2")
         self.failUnlessEqual("postcondition: othermsg: 'message2' <type 'str'>", m)
 
-class FileUtil(unittest.TestCase):
+class FileUtil(ReallyEqualMixin, unittest.TestCase):
     def mkdir(self, basedir, path, mode=0777):
         fn = os.path.join(basedir, path)
         fileutil.make_dirs(fn, mode)
@@ -420,33 +422,14 @@ class FileUtil(unittest.TestCase):
         fileutil.rm_dir(basedir)
         fileutil.remove_if_possible(fn) # should survive errors
 
-    def test_open_or_create(self):
-        basedir = "util/FileUtil/test_open_or_create"
+    def test_write_atomically(self):
+        basedir = "util/FileUtil/test_write_atomically"
         fileutil.make_dirs(basedir)
         fn = os.path.join(basedir, "here")
-        f = fileutil.open_or_create(fn)
-        f.write("stuff.")
-        f.close()
-        f = fileutil.open_or_create(fn)
-        f.seek(0, 2)
-        f.write("more.")
-        f.close()
-        f = open(fn, "r")
-        data = f.read()
-        f.close()
-        self.failUnlessEqual(data, "stuff.more.")
-
-    def test_NamedTemporaryDirectory(self):
-        basedir = "util/FileUtil/test_NamedTemporaryDirectory"
-        fileutil.make_dirs(basedir)
-        td = fileutil.NamedTemporaryDirectory(dir=basedir)
-        name = td.name
-        self.failUnless(basedir in name)
-        self.failUnless(basedir in repr(td))
-        self.failUnless(os.path.isdir(name))
-        del td
-        # it is conceivable that we need to force gc here, but I'm not sure
-        self.failIf(os.path.isdir(name))
+        fileutil.write_atomically(fn, "one")
+        self.failUnlessEqual(fileutil.read(fn), "one")
+        fileutil.write_atomically(fn, "two", mode="") # non-binary
+        self.failUnlessEqual(fileutil.read(fn), "two")
 
     def test_rename(self):
         basedir = "util/FileUtil/test_rename"
@@ -479,11 +462,38 @@ class FileUtil(unittest.TestCase):
         abspath_cwd = fileutil.abspath_expanduser_unicode(u".")
         self.failUnless(isinstance(saved_cwd, unicode), saved_cwd)
         self.failUnless(isinstance(abspath_cwd, unicode), abspath_cwd)
-        self.failUnlessEqual(abspath_cwd, saved_cwd)
+        if sys.platform == "win32":
+            self.failUnlessReallyEqual(abspath_cwd, fileutil.to_windows_long_path(saved_cwd))
+        else:
+            self.failUnlessReallyEqual(abspath_cwd, saved_cwd)
+
+        self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\?\\foo"), u"\\\\?\\foo")
+        self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\.\\foo"), u"\\\\.\\foo")
+        self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\server\\foo"), u"\\\\?\\UNC\\server\\foo")
+        self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo"), u"\\\\?\\C:\\foo")
+        self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo/bar"), u"\\\\?\\C:\\foo\\bar")
 
         # adapted from <http://svn.python.org/view/python/branches/release26-maint/Lib/test/test_posixpath.py?view=markup&pathrev=78279#test_abspath>
 
-        self.failUnlessIn(u"foo", fileutil.abspath_expanduser_unicode(u"foo"))
+        foo = fileutil.abspath_expanduser_unicode(u"foo")
+        self.failUnless(foo.endswith(u"%sfoo" % (os.path.sep,)), foo)
+
+        foobar = fileutil.abspath_expanduser_unicode(u"bar", base=foo)
+        self.failUnless(foobar.endswith(u"%sfoo%sbar" % (os.path.sep, os.path.sep)), foobar)
+
+        if sys.platform == "win32":
+            # This is checking that a drive letter is added for a path without one.
+            baz = fileutil.abspath_expanduser_unicode(u"\\baz")
+            self.failUnless(baz.startswith(u"\\\\?\\"), baz)
+            self.failUnlessReallyEqual(baz[5 :], u":\\baz")
+
+            bar = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz)
+            self.failUnless(bar.startswith(u"\\\\?\\"), bar)
+            self.failUnlessReallyEqual(bar[5 :], u":\\bar")
+            # not u":\\baz\\bar", because \bar is absolute on the current drive.
+
+            self.failUnlessReallyEqual(baz[4], bar[4])  # same drive
+
         self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~"))
 
         cwds = ['cwd']
@@ -503,6 +513,60 @@ class FileUtil(unittest.TestCase):
             finally:
                 os.chdir(saved_cwd)
 
+    def test_create_long_path(self):
+        workdir = u"test_create_long_path"
+        fileutil.make_dirs(workdir)
+        long_path = fileutil.abspath_expanduser_unicode(os.path.join(workdir, u'x'*255))
+        def _cleanup():
+            fileutil.remove(long_path)
+        self.addCleanup(_cleanup)
+
+        fileutil.write(long_path, "test")
+        self.failUnless(os.path.exists(long_path))
+        self.failUnlessEqual(fileutil.read(long_path), "test")
+        _cleanup()
+        self.failIf(os.path.exists(long_path))
+
+    def _test_windows_expanduser(self, userprofile=None, homedrive=None, homepath=None):
+        def call_windows_getenv(name):
+            if name == u"USERPROFILE": return userprofile
+            if name == u"HOMEDRIVE":   return homedrive
+            if name == u"HOMEPATH":    return homepath
+            self.fail("unexpected argument to call_windows_getenv")
+        self.patch(fileutil, 'windows_getenv', call_windows_getenv)
+
+        self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100"))
+        self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~\\foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo"))
+        self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~/foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo"))
+        self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a"), u"a")
+        self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a~"), u"a~")
+        self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a\\~\\foo"), u"a\\~\\foo")
+
+    def test_windows_expanduser_xp(self):
+        return self._test_windows_expanduser(homedrive=u"C:", homepath=u"\\Documents and Settings\\\u0100")
+
+    def test_windows_expanduser_win7(self):
+        return self._test_windows_expanduser(userprofile=os.path.join(u"C:", u"\\Documents and Settings\\\u0100"))
+
+    def test_disk_stats(self):
+        avail = fileutil.get_available_space('.', 2**14)
+        if avail == 0:
+            raise unittest.SkipTest("This test will spuriously fail there is no disk space left.")
+
+        disk = fileutil.get_disk_stats('.', 2**13)
+        self.failUnless(disk['total'] > 0, disk['total'])
+        # we tolerate used==0 for a Travis-CI bug, see #2290
+        self.failUnless(disk['used'] >= 0, disk['used'])
+        self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
+        self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
+        self.failUnless(disk['avail'] > 0, disk['avail'])
+
+    def test_disk_stats_avail_nonnegative(self):
+        # This test will spuriously fail if you have more than 2^128
+        # bytes of available space on your filesystem.
+        disk = fileutil.get_disk_stats('.', 2**128)
+        self.failUnlessEqual(disk['avail'], 0)
+
 class PollMixinTests(unittest.TestCase):
     def setUp(self):
         self.pm = pollmixin.PollMixin()
@@ -530,7 +594,7 @@ class PollMixinTests(unittest.TestCase):
         d.addCallbacks(_suc, _err)
         return d
 
-class DeferredUtilTests(unittest.TestCase):
+class DeferredUtilTests(unittest.TestCase, deferredutil.WaitForDelayedCallsMixin):
     def test_gather_results(self):
         d1 = defer.Deferred()
         d2 = defer.Deferred()
@@ -570,6 +634,21 @@ class DeferredUtilTests(unittest.TestCase):
         self.failUnless(isinstance(f, Failure))
         self.failUnless(f.check(ValueError))
 
+    def test_wait_for_delayed_calls(self):
+        """
+        This tests that 'wait_for_delayed_calls' does in fact wait for a
+        delayed call that is active when the test returns. If it didn't,
+        Trial would report an unclean reactor error for this test.
+        """
+        def _trigger():
+            #print "trigger"
+            pass
+        reactor.callLater(0.1, _trigger)
+
+        d = defer.succeed(None)
+        d.addBoth(self.wait_for_delayed_calls)
+        return d
+
 class HashUtilTests(unittest.TestCase):
 
     def test_random_key(self):
@@ -632,11 +711,11 @@ class HashUtilTests(unittest.TestCase):
         h2.update("foo")
         self.failUnlessEqual(h1, h2.digest())
 
-    def test_constant_time_compare(self):
-        self.failUnless(hashutil.constant_time_compare("a", "a"))
-        self.failUnless(hashutil.constant_time_compare("ab", "ab"))
-        self.failIf(hashutil.constant_time_compare("a", "b"))
-        self.failIf(hashutil.constant_time_compare("a", "aa"))
+    def test_timing_safe_compare(self):
+        self.failUnless(hashutil.timing_safe_compare("a", "a"))
+        self.failUnless(hashutil.timing_safe_compare("ab", "ab"))
+        self.failIf(hashutil.timing_safe_compare("a", "b"))
+        self.failIf(hashutil.timing_safe_compare("a", "aa"))
 
     def _testknown(self, hashf, expected_a, *args):
         got = hashf(*args)
@@ -706,7 +785,8 @@ class Abbreviate(unittest.TestCase):
                     (1000*1000*1000, "1.00 GB"),
                     (1000*1000*1000*1000, "1.00 TB"),
                     (1000*1000*1000*1000*1000, "1.00 PB"),
-                    (1234567890123456, "1.23 PB"),
+                    (1000*1000*1000*1000*1000*1000, "1.00 EB"),
+                    (1234567890123456789, "1.23 EB"),
                     ]
         for (x, expected) in tests_si:
             got = abbreviate.abbreviate_space(x, SI=True)
@@ -726,7 +806,8 @@ class Abbreviate(unittest.TestCase):
                           (1024*1024*1024*1024, "1.00 TiB"),
                           (1000*1000*1000*1000*1000, "909.49 TiB"),
                           (1024*1024*1024*1024*1024, "1.00 PiB"),
-                          (1234567890123456, "1.10 PiB"),
+                          (1024*1024*1024*1024*1024*1024, "1.00 EiB"),
+                          (1234567890123456789, "1.07 EiB"),
                     ]
         for (x, expected) in tests_base1024:
             got = abbreviate.abbreviate_space(x, SI=False)
@@ -748,8 +829,19 @@ class Abbreviate(unittest.TestCase):
         self.failUnlessEqual(p("10MiB"), 10*1024*1024)
         self.failUnlessEqual(p("5G"), 5*1000*1000*1000)
         self.failUnlessEqual(p("4GiB"), 4*1024*1024*1024)
+        self.failUnlessEqual(p("3TB"), 3*1000*1000*1000*1000)
+        self.failUnlessEqual(p("3TiB"), 3*1024*1024*1024*1024)
+        self.failUnlessEqual(p("6PB"), 6*1000*1000*1000*1000*1000)
+        self.failUnlessEqual(p("6PiB"), 6*1024*1024*1024*1024*1024)
+        self.failUnlessEqual(p("9EB"), 9*1000*1000*1000*1000*1000*1000)
+        self.failUnlessEqual(p("9EiB"), 9*1024*1024*1024*1024*1024*1024)
+
         e = self.failUnlessRaises(ValueError, p, "12 cubits")
-        self.failUnless("12 cubits" in str(e))
+        self.failUnlessIn("12 cubits", str(e))
+        e = self.failUnlessRaises(ValueError, p, "1 BB")
+        self.failUnlessIn("1 BB", str(e))
+        e = self.failUnlessRaises(ValueError, p, "fhtagn")
+        self.failUnlessIn("fhtagn", str(e))
 
 class Limiter(unittest.TestCase):
     timeout = 480 # This takes longer than 240 seconds on Francois's arm box.
@@ -826,7 +918,7 @@ class Limiter(unittest.TestCase):
         d.addCallback(_all_done)
         return d
 
-class TimeFormat(unittest.TestCase):
+class TimeFormat(unittest.TestCase, TimezoneMixin):
     def test_epoch(self):
         return self._help_test_epoch()
 
@@ -840,19 +932,8 @@ class TimeFormat(unittest.TestCase):
         # time_format.iso_utc_time_to_localseconds() breaks if the timezone is
         # Europe/London.  (As soon as this unit test is done then I'll change
         # that implementation to something that works even in this case...)
-        origtz = os.environ.get('TZ')
-        os.environ['TZ'] = "Europe/London"
-        if hasattr(time, 'tzset'):
-            time.tzset()
-        try:
-            return self._help_test_epoch()
-        finally:
-            if origtz is None:
-                del os.environ['TZ']
-            else:
-                os.environ['TZ'] = origtz
-            if hasattr(time, 'tzset'):
-                time.tzset()
+        self.setTimezone("Europe/London")
+        return self._help_test_epoch()
 
     def _help_test_epoch(self):
         origtzname = time.tzname
@@ -914,6 +995,55 @@ class TimeFormat(unittest.TestCase):
     def test_parse_date(self):
         self.failUnlessEqual(time_format.parse_date("2010-02-21"), 1266710400)
 
+    def test_format_time(self):
+        self.failUnlessEqual(time_format.format_time(time.gmtime(0)), '1970-01-01 00:00:00')
+        self.failUnlessEqual(time_format.format_time(time.gmtime(60)), '1970-01-01 00:01:00')
+        self.failUnlessEqual(time_format.format_time(time.gmtime(60*60)), '1970-01-01 01:00:00')
+        seconds_per_day = 60*60*24
+        leap_years_1970_to_2014_inclusive = ((2012 - 1968) // 4)
+        self.failUnlessEqual(time_format.format_time(time.gmtime(seconds_per_day*((2015 - 1970)*365+leap_years_1970_to_2014_inclusive))), '2015-01-01 00:00:00')
+
+    def test_format_time_y2038(self):
+        seconds_per_day = 60*60*24
+        leap_years_1970_to_2047_inclusive = ((2044 - 1968) // 4)
+        self.failUnlessEqual(time_format.format_time(time.gmtime(seconds_per_day*((2048 - 1970)*365+leap_years_1970_to_2047_inclusive))), '2048-01-01 00:00:00')
+
+    test_format_time_y2038.todo = "This test is known to fail on systems with 32-bit time_t."
+
+    def test_format_delta(self):
+        time_1 = 1389812723
+        time_5s_delta = 1389812728
+        time_28m7s_delta = 1389814410
+        time_1h_delta = 1389816323
+        time_1d21h46m49s_delta = 1389977532
+
+        self.failUnlessEqual(
+            time_format.format_delta(time_1, time_1), '0s')
+
+        self.failUnlessEqual(
+            time_format.format_delta(time_1, time_5s_delta), '5s')
+        self.failUnlessEqual(
+            time_format.format_delta(time_1, time_28m7s_delta), '28m 7s')
+        self.failUnlessEqual(
+            time_format.format_delta(time_1, time_1h_delta), '1h 0m 0s')
+        self.failUnlessEqual(
+            time_format.format_delta(time_1, time_1d21h46m49s_delta), '1d 21h 46m 49s')
+
+        self.failUnlessEqual(
+            time_format.format_delta(time_1d21h46m49s_delta, time_1), '-')
+
+        # time_1 with a decimal fraction will make the delta 1s less
+        time_1decimal = 1389812723.383963
+
+        self.failUnlessEqual(
+            time_format.format_delta(time_1decimal, time_5s_delta), '4s')
+        self.failUnlessEqual(
+            time_format.format_delta(time_1decimal, time_28m7s_delta), '28m 6s')
+        self.failUnlessEqual(
+            time_format.format_delta(time_1decimal, time_1h_delta), '59m 59s')
+        self.failUnlessEqual(
+            time_format.format_delta(time_1decimal, time_1d21h46m49s_delta), '1d 21h 46m 48s')
+
 class CacheDir(unittest.TestCase):
     def test_basic(self):
         basedir = "test_util/CacheDir/test_basic"
@@ -1092,16 +1222,13 @@ class DictUtil(unittest.TestCase):
         ds.discard(2, "c")
         self.failIf(2 in ds)
 
-        ds.union(1, ["a", "e"])
-        ds.union(3, ["f"])
-        self.failUnlessEqual(ds[1], set(["a","e"]))
-        self.failUnlessEqual(ds[3], set(["f"]))
+        ds.add(3, "f")
         ds2 = dictutil.DictOfSets()
         ds2.add(3, "f")
         ds2.add(3, "g")
         ds2.add(4, "h")
         ds.update(ds2)
-        self.failUnlessEqual(ds[1], set(["a","e"]))
+        self.failUnlessEqual(ds[1], set(["a"]))
         self.failUnlessEqual(ds[3], set(["f", "g"]))
         self.failUnlessEqual(ds[4], set(["h"]))
 
@@ -1614,8 +1741,10 @@ class SimpleSpans:
         if prevstart is not None:
             yield (prevstart, prevend-prevstart+1)
 
-    def __len__(self):
-        # this also gets us bool(s)
+    def __nonzero__(self): # this gets us bool()
+        return self.len()
+
+    def len(self):
         return len(self._have)
 
     def __add__(self, other):
@@ -1659,11 +1788,14 @@ class ByteSpans(unittest.TestCase):
         self.failUnlessEqual(list(s), [])
         self.failIf(s)
         self.failIf((0,1) in s)
-        self.failUnlessEqual(len(s), 0)
+        self.failUnlessEqual(s.len(), 0)
 
         s1 = Spans(3, 4) # 3,4,5,6
         self._check1(s1)
 
+        s1 = Spans(3L, 4L) # 3,4,5,6
+        self._check1(s1)
+
         s2 = Spans(s1)
         self._check1(s2)
 
@@ -1672,15 +1804,15 @@ class ByteSpans(unittest.TestCase):
         self.failUnless((10,1) in s2)
         self.failIf((10,1) in s1)
         self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11])
-        self.failUnlessEqual(len(s2), 6)
+        self.failUnlessEqual(s2.len(), 6)
 
         s2.add(15,2).add(20,2)
         self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11,15,16,20,21])
-        self.failUnlessEqual(len(s2), 10)
+        self.failUnlessEqual(s2.len(), 10)
 
         s2.remove(4,3).remove(15,1)
         self.failUnlessEqual(list(s2.each()), [3,10,11,16,20,21])
-        self.failUnlessEqual(len(s2), 6)
+        self.failUnlessEqual(s2.len(), 6)
 
         s1 = SimpleSpans(3, 4) # 3 4 5 6
         s2 = SimpleSpans(5, 4) # 5 6 7 8
@@ -1690,7 +1822,7 @@ class ByteSpans(unittest.TestCase):
     def _check1(self, s):
         self.failUnlessEqual(list(s), [(3,4)])
         self.failUnless(s)
-        self.failUnlessEqual(len(s), 4)
+        self.failUnlessEqual(s.len(), 4)
         self.failIf((0,1) in s)
         self.failUnless((3,4) in s)
         self.failUnless((3,1) in s)
@@ -1700,6 +1832,15 @@ class ByteSpans(unittest.TestCase):
         self.failIf((7,1) in s)
         self.failUnlessEqual(list(s.each()), [3,4,5,6])
 
+    def test_large(self):
+        s = Spans(4, 2**65) # don't do this with a SimpleSpans
+        self.failUnlessEqual(list(s), [(4, 2**65)])
+        self.failUnless(s)
+        self.failUnlessEqual(s.len(), 2**65)
+        self.failIf((0,1) in s)
+        self.failUnless((4,2) in s)
+        self.failUnless((2**65,2) in s)
+
     def test_math(self):
         s1 = Spans(0, 10) # 0,1,2,3,4,5,6,7,8,9
         s2 = Spans(5, 3) # 5,6,7
@@ -1767,7 +1908,7 @@ class ByteSpans(unittest.TestCase):
         def _create(subseed):
             ns1 = S1(); ns2 = S2()
             for i in range(10):
-                what = md5(subseed+str(i)).hexdigest()
+                what = _hash(subseed+str(i)).hexdigest()
                 start = int(what[2:4], 16)
                 length = max(1,int(what[5:6], 16))
                 ns1.add(start, length); ns2.add(start, length)
@@ -1775,7 +1916,7 @@ class ByteSpans(unittest.TestCase):
 
         #print
         for i in range(1000):
-            what = md5(seed+str(i)).hexdigest()
+            what = _hash(seed+str(i)).hexdigest()
             op = what[0]
             subop = what[1]
             start = int(what[2:4], 16)
@@ -1817,11 +1958,11 @@ class ByteSpans(unittest.TestCase):
                 s1 = s1 & ns1; s2 = s2 & ns2
             #print "s2 now %s" % s2.dump()
             self.failUnlessEqual(list(s1.each()), list(s2.each()))
-            self.failUnlessEqual(len(s1), len(s2))
+            self.failUnlessEqual(s1.len(), s2.len())
             self.failUnlessEqual(bool(s1), bool(s2))
             self.failUnlessEqual(list(s1), list(s2))
             for j in range(10):
-                what = md5(what[12:14]+str(j)).hexdigest()
+                what = _hash(what[12:14]+str(j)).hexdigest()
                 start = int(what[2:4], 16)
                 length = max(1, int(what[5:6], 16))
                 span = (start, length)
@@ -1837,8 +1978,8 @@ class ByteSpans(unittest.TestCase):
     # list(s) -> list of (start,length) tuples, one per span
     # (start,length) in s -> True if (start..start+length-1) are all members
     #  NOT equivalent to x in list(s)
-    # len(s) -> number of bytes, for testing, bool(), and accounting/limiting
-    # bool(s)  (__len__)
+    # s.len() -> number of bytes, for testing, bool(), and accounting/limiting
+    # bool(s)  (__nonzeron__)
     # s = s1+s2, s1-s2, +=s1, -=s1
 
     def test_overlap(self):
@@ -1893,7 +2034,9 @@ class SimpleDataSpans:
             for (start, data) in other.get_chunks():
                 self.add(start, data)
 
-    def __len__(self):
+    def __nonzero__(self): # this gets us bool()
+        return self.len()
+    def len(self):
         return len(self.missing.replace("1", ""))
     def _dump(self):
         return [i for (i,c) in enumerate(self.missing) if c == "0"]
@@ -1930,26 +2073,26 @@ class SimpleDataSpans:
 class StringSpans(unittest.TestCase):
     def do_basic(self, klass):
         ds = klass()
-        self.failUnlessEqual(len(ds), 0)
+        self.failUnlessEqual(ds.len(), 0)
         self.failUnlessEqual(list(ds._dump()), [])
         self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 0)
-        s = ds.get_spans()
+        s1 = ds.get_spans()
         self.failUnlessEqual(ds.get(0, 4), None)
         self.failUnlessEqual(ds.pop(0, 4), None)
         ds.remove(0, 4)
 
         ds.add(2, "four")
-        self.failUnlessEqual(len(ds), 4)
+        self.failUnlessEqual(ds.len(), 4)
         self.failUnlessEqual(list(ds._dump()), [2,3,4,5])
         self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 4)
-        s = ds.get_spans()
-        self.failUnless((2,2) in s)
+        s1 = ds.get_spans()
+        self.failUnless((2,2) in s1)
         self.failUnlessEqual(ds.get(0, 4), None)
         self.failUnlessEqual(ds.pop(0, 4), None)
         self.failUnlessEqual(ds.get(4, 4), None)
 
         ds2 = klass(ds)
-        self.failUnlessEqual(len(ds2), 4)
+        self.failUnlessEqual(ds2.len(), 4)
         self.failUnlessEqual(list(ds2._dump()), [2,3,4,5])
         self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_chunks()]), 4)
         self.failUnlessEqual(ds2.get(0, 4), None)
@@ -1962,7 +2105,7 @@ class StringSpans(unittest.TestCase):
         self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 4)
 
         ds.add(0, "23")
-        self.failUnlessEqual(len(ds), 6)
+        self.failUnlessEqual(ds.len(), 6)
         self.failUnlessEqual(list(ds._dump()), [0,1,2,3,4,5])
         self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 6)
         self.failUnlessEqual(ds.get(0, 4), "23fo")
@@ -1976,6 +2119,12 @@ class StringSpans(unittest.TestCase):
         ds.add(3, "ea")
         self.failUnlessEqual(ds.get(2, 4), "fear")
 
+        ds = klass()
+        ds.add(2L, "four")
+        ds.add(3L, "ea")
+        self.failUnlessEqual(ds.get(2L, 4L), "fear")
+
+
     def do_scan(self, klass):
         # do a test with gaps and spans of size 1 and 2
         #  left=(1,11) * right=(1,11) * gapsize=(1,2)
@@ -2013,9 +2162,7 @@ class StringSpans(unittest.TestCase):
             return ds
         def dump(s):
             p = set(s._dump())
-            # wow, this is the first time I've ever wanted ?: in python
-            # note: this requires python2.5
-            d = "".join([(S[i] if i in p else " ") for i in range(l)])
+            d = "".join([((i not in p) and " " or S[i]) for i in range(l)])
             assert len(d) == l
             return d
         DEBUG = False
@@ -2084,14 +2231,14 @@ class StringSpans(unittest.TestCase):
             created = 0
             pieces = []
             while created < length:
-                piece = md5(seed + str(created)).hexdigest()
+                piece = _hash(seed + str(created)).hexdigest()
                 pieces.append(piece)
                 created += len(piece)
             return "".join(pieces)[:length]
         def _create(subseed):
             ns1 = S1(); ns2 = S2()
             for i in range(10):
-                what = md5(subseed+str(i)).hexdigest()
+                what = _hash(subseed+str(i)).hexdigest()
                 start = int(what[2:4], 16)
                 length = max(1,int(what[5:6], 16))
                 ns1.add(start, _randstr(length, what[7:9]));
@@ -2100,7 +2247,7 @@ class StringSpans(unittest.TestCase):
 
         #print
         for i in range(1000):
-            what = md5(seed+str(i)).hexdigest()
+            what = _hash(seed+str(i)).hexdigest()
             op = what[0]
             subop = what[1]
             start = int(what[2:4], 16)
@@ -2125,10 +2272,10 @@ class StringSpans(unittest.TestCase):
                 self.failUnlessEqual(d1, d2)
             #print "s1 now %s" % list(s1._dump())
             #print "s2 now %s" % list(s2._dump())
-            self.failUnlessEqual(len(s1), len(s2))
+            self.failUnlessEqual(s1.len(), s2.len())
             self.failUnlessEqual(list(s1._dump()), list(s2._dump()))
             for j in range(100):
-                what = md5(what[12:14]+str(j)).hexdigest()
+                what = _hash(what[12:14]+str(j)).hexdigest()
                 start = int(what[2:4], 16)
                 length = max(1, int(what[5:6], 16))
                 d1 = s1.get(start, length); d2 = s2.get(start, length)