def foo(): pass # keep the line number constant
-import os, time
+import os, time, sys
from StringIO import StringIO
from twisted.trial import unittest
from twisted.internet import defer, reactor
from twisted.python.failure import Failure
from twisted.python import log
+from pycryptopp.hash.sha256 import SHA256 as _hash
from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil
from allmydata.util import assertutil, fileutil, deferredutil, abbreviate
from allmydata.util import limiter, time_format, pollmixin, cachedir
from allmydata.util import statistics, dictutil, pipeline
from allmydata.util import log as tahoe_log
+from allmydata.util.spans import Spans, overlap, DataSpans
+from allmydata.test.common_util import ReallyEqualMixin
+
class Base32(unittest.TestCase):
def test_b2a_matches_Pythons(self):
m = self.should_assert(f, False, othermsg="message2")
self.failUnlessEqual("postcondition: othermsg: 'message2' <type 'str'>", m)
-class FileUtil(unittest.TestCase):
+class FileUtil(ReallyEqualMixin, unittest.TestCase):
def mkdir(self, basedir, path, mode=0777):
fn = os.path.join(basedir, path)
fileutil.make_dirs(fn, mode)
fileutil.rm_dir(basedir)
fileutil.remove_if_possible(fn) # should survive errors
- def test_open_or_create(self):
- basedir = "util/FileUtil/test_open_or_create"
+ def test_write_atomically(self):
+ basedir = "util/FileUtil/test_write_atomically"
fileutil.make_dirs(basedir)
fn = os.path.join(basedir, "here")
- f = fileutil.open_or_create(fn)
- f.write("stuff.")
- f.close()
- f = fileutil.open_or_create(fn)
- f.seek(0, 2)
- f.write("more.")
- f.close()
- f = open(fn, "r")
- data = f.read()
- f.close()
- self.failUnlessEqual(data, "stuff.more.")
-
- def test_NamedTemporaryDirectory(self):
- basedir = "util/FileUtil/test_NamedTemporaryDirectory"
- fileutil.make_dirs(basedir)
- td = fileutil.NamedTemporaryDirectory(dir=basedir)
- name = td.name
- self.failUnless(basedir in name)
- self.failUnless(basedir in repr(td))
- self.failUnless(os.path.isdir(name))
- del td
- # it is conceivable that we need to force gc here, but I'm not sure
- self.failIf(os.path.isdir(name))
+ fileutil.write_atomically(fn, "one")
+ self.failUnlessEqual(fileutil.read(fn), "one")
+ fileutil.write_atomically(fn, "two", mode="") # non-binary
+ self.failUnlessEqual(fileutil.read(fn), "two")
def test_rename(self):
basedir = "util/FileUtil/test_rename"
self.failIf(os.path.exists(fn))
self.failUnless(os.path.exists(fn2))
+ def test_rename_no_overwrite(self):
+ workdir = fileutil.abspath_expanduser_unicode(u"test_rename_no_overwrite")
+ fileutil.make_dirs(workdir)
+
+ source_path = os.path.join(workdir, "source")
+ dest_path = os.path.join(workdir, "dest")
+
+ # when neither file exists
+ self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
+
+ # when only dest exists
+ fileutil.write(dest_path, "dest")
+ self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
+ self.failUnlessEqual(fileutil.read(dest_path), "dest")
+
+ # when both exist
+ fileutil.write(source_path, "source")
+ self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
+ self.failUnlessEqual(fileutil.read(source_path), "source")
+ self.failUnlessEqual(fileutil.read(dest_path), "dest")
+
+ # when only source exists
+ os.remove(dest_path)
+ fileutil.rename_no_overwrite(source_path, dest_path)
+ self.failUnlessEqual(fileutil.read(dest_path), "source")
+ self.failIf(os.path.exists(source_path))
+
+ def test_replace_file(self):
+ workdir = fileutil.abspath_expanduser_unicode(u"test_replace_file")
+ fileutil.make_dirs(workdir)
+
+ backup_path = os.path.join(workdir, "backup")
+ replaced_path = os.path.join(workdir, "replaced")
+ replacement_path = os.path.join(workdir, "replacement")
+
+ # when none of the files exist
+ self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path, backup_path)
+
+ # when only replaced exists
+ fileutil.write(replaced_path, "foo")
+ self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path, backup_path)
+ self.failUnlessEqual(fileutil.read(replaced_path), "foo")
+
+ # when both replaced and replacement exist, but not backup
+ fileutil.write(replacement_path, "bar")
+ fileutil.replace_file(replaced_path, replacement_path, backup_path)
+ self.failUnlessEqual(fileutil.read(backup_path), "foo")
+ self.failUnlessEqual(fileutil.read(replaced_path), "bar")
+ self.failIf(os.path.exists(replacement_path))
+
+ # when only replacement exists
+ os.remove(backup_path)
+ os.remove(replaced_path)
+ fileutil.write(replacement_path, "bar")
+ fileutil.replace_file(replaced_path, replacement_path, backup_path)
+ self.failUnlessEqual(fileutil.read(replaced_path), "bar")
+ self.failIf(os.path.exists(replacement_path))
+ self.failIf(os.path.exists(backup_path))
+
+ # when replaced, replacement and backup all exist
+ fileutil.write(replaced_path, "foo")
+ fileutil.write(replacement_path, "bar")
+ fileutil.write(backup_path, "bak")
+ fileutil.replace_file(replaced_path, replacement_path, backup_path)
+ self.failUnlessEqual(fileutil.read(backup_path), "foo")
+ self.failUnlessEqual(fileutil.read(replaced_path), "bar")
+ self.failIf(os.path.exists(replacement_path))
+
def test_du(self):
basedir = "util/FileUtil/test_du"
fileutil.make_dirs(basedir)
used = fileutil.du(basedir)
self.failUnlessEqual(10+11+12+13, used)
+ def test_abspath_expanduser_unicode(self):
+ self.failUnlessRaises(AssertionError, fileutil.abspath_expanduser_unicode, "bytestring")
+
+ saved_cwd = os.path.normpath(os.getcwdu())
+ abspath_cwd = fileutil.abspath_expanduser_unicode(u".")
+ abspath_cwd_notlong = fileutil.abspath_expanduser_unicode(u".", long_path=False)
+ self.failUnless(isinstance(saved_cwd, unicode), saved_cwd)
+ self.failUnless(isinstance(abspath_cwd, unicode), abspath_cwd)
+ if sys.platform == "win32":
+ self.failUnlessReallyEqual(abspath_cwd, fileutil.to_windows_long_path(saved_cwd))
+ else:
+ self.failUnlessReallyEqual(abspath_cwd, saved_cwd)
+ self.failUnlessReallyEqual(abspath_cwd_notlong, saved_cwd)
+
+ self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\?\\foo"), u"\\\\?\\foo")
+ self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\.\\foo"), u"\\\\.\\foo")
+ self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\server\\foo"), u"\\\\?\\UNC\\server\\foo")
+ self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo"), u"\\\\?\\C:\\foo")
+ self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo/bar"), u"\\\\?\\C:\\foo\\bar")
+
+ # adapted from <http://svn.python.org/view/python/branches/release26-maint/Lib/test/test_posixpath.py?view=markup&pathrev=78279#test_abspath>
+
+ foo = fileutil.abspath_expanduser_unicode(u"foo")
+ self.failUnless(foo.endswith(u"%sfoo" % (os.path.sep,)), foo)
+
+ foobar = fileutil.abspath_expanduser_unicode(u"bar", base=foo)
+ self.failUnless(foobar.endswith(u"%sfoo%sbar" % (os.path.sep, os.path.sep)), foobar)
+
+ if sys.platform == "win32":
+ # This is checking that a drive letter is added for a path without one.
+ baz = fileutil.abspath_expanduser_unicode(u"\\baz")
+ self.failUnless(baz.startswith(u"\\\\?\\"), baz)
+ self.failUnlessReallyEqual(baz[5 :], u":\\baz")
+
+ bar = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz)
+ self.failUnless(bar.startswith(u"\\\\?\\"), bar)
+ self.failUnlessReallyEqual(bar[5 :], u":\\bar")
+ # not u":\\baz\\bar", because \bar is absolute on the current drive.
+
+ self.failUnlessReallyEqual(baz[4], bar[4]) # same drive
+
+ baz_notlong = fileutil.abspath_expanduser_unicode(u"\\baz", long_path=False)
+ self.failIf(baz_notlong.startswith(u"\\\\?\\"), baz_notlong)
+ self.failUnlessReallyEqual(baz_notlong[1 :], u":\\baz")
+
+ bar_notlong = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz_notlong, long_path=False)
+ self.failIf(bar_notlong.startswith(u"\\\\?\\"), bar_notlong)
+ self.failUnlessReallyEqual(bar_notlong[1 :], u":\\bar")
+ # not u":\\baz\\bar", because \bar is absolute on the current drive.
+
+ self.failUnlessReallyEqual(baz_notlong[0], bar_notlong[0]) # same drive
+
+ self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~"))
+ self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~", long_path=False))
+
+ cwds = ['cwd']
+ try:
+ cwds.append(u'\xe7w\xf0'.encode(sys.getfilesystemencoding()
+ or 'ascii'))
+ except UnicodeEncodeError:
+ pass # the cwd can't be encoded -- test with ascii cwd only
+
+ for cwd in cwds:
+ try:
+ os.mkdir(cwd)
+ os.chdir(cwd)
+ for upath in (u'', u'fuu', u'f\xf9\xf9', u'/fuu', u'U:\\', u'~'):
+ uabspath = fileutil.abspath_expanduser_unicode(upath)
+ self.failUnless(isinstance(uabspath, unicode), uabspath)
+
+ uabspath_notlong = fileutil.abspath_expanduser_unicode(upath, long_path=False)
+ self.failUnless(isinstance(uabspath_notlong, unicode), uabspath_notlong)
+ finally:
+ os.chdir(saved_cwd)
+
+ def test_create_long_path(self):
+ workdir = u"test_create_long_path"
+ fileutil.make_dirs(workdir)
+ long_path = fileutil.abspath_expanduser_unicode(os.path.join(workdir, u'x'*255))
+ def _cleanup():
+ fileutil.remove(long_path)
+ self.addCleanup(_cleanup)
+
+ fileutil.write(long_path, "test")
+ self.failUnless(os.path.exists(long_path))
+ self.failUnlessEqual(fileutil.read(long_path), "test")
+ _cleanup()
+ self.failIf(os.path.exists(long_path))
+
+ def _test_windows_expanduser(self, userprofile=None, homedrive=None, homepath=None):
+ def call_windows_getenv(name):
+ if name == u"USERPROFILE": return userprofile
+ if name == u"HOMEDRIVE": return homedrive
+ if name == u"HOMEPATH": return homepath
+ self.fail("unexpected argument to call_windows_getenv")
+ self.patch(fileutil, 'windows_getenv', call_windows_getenv)
+
+ self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100"))
+ self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~\\foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo"))
+ self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~/foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo"))
+ self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a"), u"a")
+ self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a~"), u"a~")
+ self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a\\~\\foo"), u"a\\~\\foo")
+
+ def test_windows_expanduser_xp(self):
+ return self._test_windows_expanduser(homedrive=u"C:", homepath=u"\\Documents and Settings\\\u0100")
+
+ def test_windows_expanduser_win7(self):
+ return self._test_windows_expanduser(userprofile=os.path.join(u"C:", u"\\Documents and Settings\\\u0100"))
+
+ def test_disk_stats(self):
+ avail = fileutil.get_available_space('.', 2**14)
+ if avail == 0:
+ raise unittest.SkipTest("This test will spuriously fail there is no disk space left.")
+
+ disk = fileutil.get_disk_stats('.', 2**13)
+ self.failUnless(disk['total'] > 0, disk['total'])
+ # we tolerate used==0 for a Travis-CI bug, see #2290
+ self.failUnless(disk['used'] >= 0, disk['used'])
+ self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
+ self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
+ self.failUnless(disk['avail'] > 0, disk['avail'])
+
+ def test_disk_stats_avail_nonnegative(self):
+ # This test will spuriously fail if you have more than 2^128
+ # bytes of available space on your filesystem.
+ disk = fileutil.get_disk_stats('.', 2**128)
+ self.failUnlessEqual(disk['avail'], 0)
+
+ def test_get_pathinfo(self):
+ basedir = "util/FileUtil/test_get_pathinfo"
+ fileutil.make_dirs(basedir)
+
+ # create a directory
+ self.mkdir(basedir, "a")
+ dirinfo = fileutil.get_pathinfo(basedir)
+ self.failUnlessTrue(dirinfo.isdir)
+ self.failUnlessTrue(dirinfo.exists)
+ self.failUnlessFalse(dirinfo.isfile)
+ self.failUnlessFalse(dirinfo.islink)
+
+ # create a file
+ f = os.path.join(basedir, "1.txt")
+ fileutil.write(f, "a"*10)
+ fileinfo = fileutil.get_pathinfo(f)
+ self.failUnlessTrue(fileinfo.isfile)
+ self.failUnlessTrue(fileinfo.exists)
+ self.failUnlessFalse(fileinfo.isdir)
+ self.failUnlessFalse(fileinfo.islink)
+ self.failUnlessEqual(fileinfo.size, 10)
+
+ # path at which nothing exists
+ dnename = os.path.join(basedir, "doesnotexist")
+ now = time.time()
+ dneinfo = fileutil.get_pathinfo(dnename, now=now)
+ self.failUnlessFalse(dneinfo.exists)
+ self.failUnlessFalse(dneinfo.isfile)
+ self.failUnlessFalse(dneinfo.isdir)
+ self.failUnlessFalse(dneinfo.islink)
+ self.failUnlessEqual(dneinfo.size, None)
+ self.failUnlessEqual(dneinfo.mtime, now)
+ self.failUnlessEqual(dneinfo.ctime, now)
+
+ def test_get_pathinfo_symlink(self):
+ if not hasattr(os, 'symlink'):
+ raise unittest.SkipTest("can't create symlinks on this platform")
+
+ basedir = "util/FileUtil/test_get_pathinfo"
+ fileutil.make_dirs(basedir)
+
+ f = os.path.join(basedir, "1.txt")
+ fileutil.write(f, "a"*10)
+
+ # create a symlink pointing to 1.txt
+ slname = os.path.join(basedir, "linkto1.txt")
+ os.symlink(f, slname)
+ symlinkinfo = fileutil.get_pathinfo(slname)
+ self.failUnlessTrue(symlinkinfo.islink)
+ self.failUnlessTrue(symlinkinfo.exists)
+ self.failUnlessFalse(symlinkinfo.isfile)
+ self.failUnlessFalse(symlinkinfo.isdir)
+
+
class PollMixinTests(unittest.TestCase):
def setUp(self):
self.pm = pollmixin.PollMixin()
d.addCallbacks(_suc, _err)
return d
-class DeferredUtilTests(unittest.TestCase):
+class DeferredUtilTests(unittest.TestCase, deferredutil.WaitForDelayedCallsMixin):
def test_gather_results(self):
d1 = defer.Deferred()
d2 = defer.Deferred()
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(ValueError))
+ def test_wait_for_delayed_calls(self):
+ """
+ This tests that 'wait_for_delayed_calls' does in fact wait for a
+ delayed call that is active when the test returns. If it didn't,
+ Trial would report an unclean reactor error for this test.
+ """
+ def _trigger():
+ #print "trigger"
+ pass
+ reactor.callLater(0.1, _trigger)
+
+ d = defer.succeed(None)
+ d.addBoth(self.wait_for_delayed_calls)
+ return d
+
class HashUtilTests(unittest.TestCase):
def test_random_key(self):
h2.update("foo")
self.failUnlessEqual(h1, h2.digest())
- def test_constant_time_compare(self):
- self.failUnless(hashutil.constant_time_compare("a", "a"))
- self.failUnless(hashutil.constant_time_compare("ab", "ab"))
- self.failIf(hashutil.constant_time_compare("a", "b"))
- self.failIf(hashutil.constant_time_compare("a", "aa"))
+ def test_timing_safe_compare(self):
+ self.failUnless(hashutil.timing_safe_compare("a", "a"))
+ self.failUnless(hashutil.timing_safe_compare("ab", "ab"))
+ self.failIf(hashutil.timing_safe_compare("a", "b"))
+ self.failIf(hashutil.timing_safe_compare("a", "aa"))
def _testknown(self, hashf, expected_a, *args):
got = hashf(*args)
(1000*1000*1000, "1.00 GB"),
(1000*1000*1000*1000, "1.00 TB"),
(1000*1000*1000*1000*1000, "1.00 PB"),
- (1234567890123456, "1.23 PB"),
+ (1000*1000*1000*1000*1000*1000, "1.00 EB"),
+ (1234567890123456789, "1.23 EB"),
]
for (x, expected) in tests_si:
got = abbreviate.abbreviate_space(x, SI=True)
(1024*1024*1024*1024, "1.00 TiB"),
(1000*1000*1000*1000*1000, "909.49 TiB"),
(1024*1024*1024*1024*1024, "1.00 PiB"),
- (1234567890123456, "1.10 PiB"),
+ (1024*1024*1024*1024*1024*1024, "1.00 EiB"),
+ (1234567890123456789, "1.07 EiB"),
]
for (x, expected) in tests_base1024:
got = abbreviate.abbreviate_space(x, SI=False)
self.failUnlessEqual(p("10MiB"), 10*1024*1024)
self.failUnlessEqual(p("5G"), 5*1000*1000*1000)
self.failUnlessEqual(p("4GiB"), 4*1024*1024*1024)
+ self.failUnlessEqual(p("3TB"), 3*1000*1000*1000*1000)
+ self.failUnlessEqual(p("3TiB"), 3*1024*1024*1024*1024)
+ self.failUnlessEqual(p("6PB"), 6*1000*1000*1000*1000*1000)
+ self.failUnlessEqual(p("6PiB"), 6*1024*1024*1024*1024*1024)
+ self.failUnlessEqual(p("9EB"), 9*1000*1000*1000*1000*1000*1000)
+ self.failUnlessEqual(p("9EiB"), 9*1024*1024*1024*1024*1024*1024)
+
e = self.failUnlessRaises(ValueError, p, "12 cubits")
- self.failUnless("12 cubits" in str(e))
+ self.failUnlessIn("12 cubits", str(e))
+ e = self.failUnlessRaises(ValueError, p, "1 BB")
+ self.failUnlessIn("1 BB", str(e))
+ e = self.failUnlessRaises(ValueError, p, "fhtagn")
+ self.failUnlessIn("fhtagn", str(e))
class Limiter(unittest.TestCase):
timeout = 480 # This takes longer than 240 seconds on Francois's arm box.
self.failUnlessEqual(thatmomentinmarch, 1237585742.226536)
self.failUnlessEqual(origtzname, time.tzname)
+ def test_iso_utc(self):
+ when = 1266760143.7841301
+ out = time_format.iso_utc_date(when)
+ self.failUnlessEqual(out, "2010-02-21")
+ out = time_format.iso_utc_date(t=lambda: when)
+ self.failUnlessEqual(out, "2010-02-21")
+ out = time_format.iso_utc(when)
+ self.failUnlessEqual(out, "2010-02-21_13:49:03.784130")
+ out = time_format.iso_utc(when, sep="-")
+ self.failUnlessEqual(out, "2010-02-21-13:49:03.784130")
+
+ def test_parse_duration(self):
+ p = time_format.parse_duration
+ DAY = 24*60*60
+ self.failUnlessEqual(p("1 day"), DAY)
+ self.failUnlessEqual(p("2 days"), 2*DAY)
+ self.failUnlessEqual(p("3 months"), 3*31*DAY)
+ self.failUnlessEqual(p("4 mo"), 4*31*DAY)
+ self.failUnlessEqual(p("5 years"), 5*365*DAY)
+ e = self.failUnlessRaises(ValueError, p, "123")
+ self.failUnlessIn("no unit (like day, month, or year) in '123'",
+ str(e))
+
+ def test_parse_date(self):
+ self.failUnlessEqual(time_format.parse_date("2010-02-21"), 1266710400)
+
class CacheDir(unittest.TestCase):
def test_basic(self):
basedir = "test_util/CacheDir/test_basic"
_failIfExists("a")
_failUnlessExists("b")
_failUnlessExists("c")
+ del b2
ctr = [0]
class EqButNotIs:
ds.discard(2, "c")
self.failIf(2 in ds)
- ds.union(1, ["a", "e"])
- ds.union(3, ["f"])
- self.failUnlessEqual(ds[1], set(["a","e"]))
- self.failUnlessEqual(ds[3], set(["f"]))
+ ds.add(3, "f")
ds2 = dictutil.DictOfSets()
ds2.add(3, "f")
ds2.add(3, "g")
ds2.add(4, "h")
ds.update(ds2)
- self.failUnlessEqual(ds[1], set(["a","e"]))
+ self.failUnlessEqual(ds[1], set(["a"]))
self.failUnlessEqual(ds[3], set(["f", "g"]))
self.failUnlessEqual(ds[4], set(["h"]))
self.calls[1][0].callback("two-result")
self.calls[2][0].errback(ValueError("three-error"))
+ del d1,d2,d3,d4
+
class SampleError(Exception):
pass
tahoe_log.err(format="intentional sample error",
failure=f, level=tahoe_log.OPERATIONAL, umid="wO9UoQ")
self.flushLoggedErrors(SampleError)
+
+
+class SimpleSpans:
+ # this is a simple+inefficient form of util.spans.Spans . We compare the
+ # behavior of this reference model against the real (efficient) form.
+
+ def __init__(self, _span_or_start=None, length=None):
+ self._have = set()
+ if length is not None:
+ for i in range(_span_or_start, _span_or_start+length):
+ self._have.add(i)
+ elif _span_or_start:
+ for (start,length) in _span_or_start:
+ self.add(start, length)
+
+ def add(self, start, length):
+ for i in range(start, start+length):
+ self._have.add(i)
+ return self
+
+ def remove(self, start, length):
+ for i in range(start, start+length):
+ self._have.discard(i)
+ return self
+
+ def each(self):
+ return sorted(self._have)
+
+ def __iter__(self):
+ items = sorted(self._have)
+ prevstart = None
+ prevend = None
+ for i in items:
+ if prevstart is None:
+ prevstart = prevend = i
+ continue
+ if i == prevend+1:
+ prevend = i
+ continue
+ yield (prevstart, prevend-prevstart+1)
+ prevstart = prevend = i
+ if prevstart is not None:
+ yield (prevstart, prevend-prevstart+1)
+
+ def __nonzero__(self): # this gets us bool()
+ return self.len()
+
+ def len(self):
+ return len(self._have)
+
+ def __add__(self, other):
+ s = self.__class__(self)
+ for (start, length) in other:
+ s.add(start, length)
+ return s
+
+ def __sub__(self, other):
+ s = self.__class__(self)
+ for (start, length) in other:
+ s.remove(start, length)
+ return s
+
+ def __iadd__(self, other):
+ for (start, length) in other:
+ self.add(start, length)
+ return self
+
+ def __isub__(self, other):
+ for (start, length) in other:
+ self.remove(start, length)
+ return self
+
+ def __and__(self, other):
+ s = self.__class__()
+ for i in other.each():
+ if i in self._have:
+ s.add(i, 1)
+ return s
+
+ def __contains__(self, (start,length)):
+ for i in range(start, start+length):
+ if i not in self._have:
+ return False
+ return True
+
+class ByteSpans(unittest.TestCase):
+ def test_basic(self):
+ s = Spans()
+ self.failUnlessEqual(list(s), [])
+ self.failIf(s)
+ self.failIf((0,1) in s)
+ self.failUnlessEqual(s.len(), 0)
+
+ s1 = Spans(3, 4) # 3,4,5,6
+ self._check1(s1)
+
+ s1 = Spans(3L, 4L) # 3,4,5,6
+ self._check1(s1)
+
+ s2 = Spans(s1)
+ self._check1(s2)
+
+ s2.add(10,2) # 10,11
+ self._check1(s1)
+ self.failUnless((10,1) in s2)
+ self.failIf((10,1) in s1)
+ self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11])
+ self.failUnlessEqual(s2.len(), 6)
+
+ s2.add(15,2).add(20,2)
+ self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11,15,16,20,21])
+ self.failUnlessEqual(s2.len(), 10)
+
+ s2.remove(4,3).remove(15,1)
+ self.failUnlessEqual(list(s2.each()), [3,10,11,16,20,21])
+ self.failUnlessEqual(s2.len(), 6)
+
+ s1 = SimpleSpans(3, 4) # 3 4 5 6
+ s2 = SimpleSpans(5, 4) # 5 6 7 8
+ i = s1 & s2
+ self.failUnlessEqual(list(i.each()), [5, 6])
+
+ def _check1(self, s):
+ self.failUnlessEqual(list(s), [(3,4)])
+ self.failUnless(s)
+ self.failUnlessEqual(s.len(), 4)
+ self.failIf((0,1) in s)
+ self.failUnless((3,4) in s)
+ self.failUnless((3,1) in s)
+ self.failUnless((5,2) in s)
+ self.failUnless((6,1) in s)
+ self.failIf((6,2) in s)
+ self.failIf((7,1) in s)
+ self.failUnlessEqual(list(s.each()), [3,4,5,6])
+
+ def test_large(self):
+ s = Spans(4, 2**65) # don't do this with a SimpleSpans
+ self.failUnlessEqual(list(s), [(4, 2**65)])
+ self.failUnless(s)
+ self.failUnlessEqual(s.len(), 2**65)
+ self.failIf((0,1) in s)
+ self.failUnless((4,2) in s)
+ self.failUnless((2**65,2) in s)
+
+ def test_math(self):
+ s1 = Spans(0, 10) # 0,1,2,3,4,5,6,7,8,9
+ s2 = Spans(5, 3) # 5,6,7
+ s3 = Spans(8, 4) # 8,9,10,11
+
+ s = s1 - s2
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9])
+ s = s1 - s3
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7])
+ s = s2 - s3
+ self.failUnlessEqual(list(s.each()), [5,6,7])
+ s = s1 & s2
+ self.failUnlessEqual(list(s.each()), [5,6,7])
+ s = s2 & s1
+ self.failUnlessEqual(list(s.each()), [5,6,7])
+ s = s1 & s3
+ self.failUnlessEqual(list(s.each()), [8,9])
+ s = s3 & s1
+ self.failUnlessEqual(list(s.each()), [8,9])
+ s = s2 & s3
+ self.failUnlessEqual(list(s.each()), [])
+ s = s3 & s2
+ self.failUnlessEqual(list(s.each()), [])
+ s = Spans() & s3
+ self.failUnlessEqual(list(s.each()), [])
+ s = s3 & Spans()
+ self.failUnlessEqual(list(s.each()), [])
+
+ s = s1 + s2
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9])
+ s = s1 + s3
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11])
+ s = s2 + s3
+ self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11])
+
+ s = Spans(s1)
+ s -= s2
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9])
+ s = Spans(s1)
+ s -= s3
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7])
+ s = Spans(s2)
+ s -= s3
+ self.failUnlessEqual(list(s.each()), [5,6,7])
+
+ s = Spans(s1)
+ s += s2
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9])
+ s = Spans(s1)
+ s += s3
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11])
+ s = Spans(s2)
+ s += s3
+ self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11])
+
+ def test_random(self):
+ # attempt to increase coverage of corner cases by comparing behavior
+ # of a simple-but-slow model implementation against the
+ # complex-but-fast actual implementation, in a large number of random
+ # operations
+ S1 = SimpleSpans
+ S2 = Spans
+ s1 = S1(); s2 = S2()
+ seed = ""
+ def _create(subseed):
+ ns1 = S1(); ns2 = S2()
+ for i in range(10):
+ what = _hash(subseed+str(i)).hexdigest()
+ start = int(what[2:4], 16)
+ length = max(1,int(what[5:6], 16))
+ ns1.add(start, length); ns2.add(start, length)
+ return ns1, ns2
+
+ #print
+ for i in range(1000):
+ what = _hash(seed+str(i)).hexdigest()
+ op = what[0]
+ subop = what[1]
+ start = int(what[2:4], 16)
+ length = max(1,int(what[5:6], 16))
+ #print what
+ if op in "0":
+ if subop in "01234":
+ s1 = S1(); s2 = S2()
+ elif subop in "5678":
+ s1 = S1(start, length); s2 = S2(start, length)
+ else:
+ s1 = S1(s1); s2 = S2(s2)
+ #print "s2 = %s" % s2.dump()
+ elif op in "123":
+ #print "s2.add(%d,%d)" % (start, length)
+ s1.add(start, length); s2.add(start, length)
+ elif op in "456":
+ #print "s2.remove(%d,%d)" % (start, length)
+ s1.remove(start, length); s2.remove(start, length)
+ elif op in "78":
+ ns1, ns2 = _create(what[7:11])
+ #print "s2 + %s" % ns2.dump()
+ s1 = s1 + ns1; s2 = s2 + ns2
+ elif op in "9a":
+ ns1, ns2 = _create(what[7:11])
+ #print "%s - %s" % (s2.dump(), ns2.dump())
+ s1 = s1 - ns1; s2 = s2 - ns2
+ elif op in "bc":
+ ns1, ns2 = _create(what[7:11])
+ #print "s2 += %s" % ns2.dump()
+ s1 += ns1; s2 += ns2
+ elif op in "de":
+ ns1, ns2 = _create(what[7:11])
+ #print "%s -= %s" % (s2.dump(), ns2.dump())
+ s1 -= ns1; s2 -= ns2
+ else:
+ ns1, ns2 = _create(what[7:11])
+ #print "%s &= %s" % (s2.dump(), ns2.dump())
+ s1 = s1 & ns1; s2 = s2 & ns2
+ #print "s2 now %s" % s2.dump()
+ self.failUnlessEqual(list(s1.each()), list(s2.each()))
+ self.failUnlessEqual(s1.len(), s2.len())
+ self.failUnlessEqual(bool(s1), bool(s2))
+ self.failUnlessEqual(list(s1), list(s2))
+ for j in range(10):
+ what = _hash(what[12:14]+str(j)).hexdigest()
+ start = int(what[2:4], 16)
+ length = max(1, int(what[5:6], 16))
+ span = (start, length)
+ self.failUnlessEqual(bool(span in s1), bool(span in s2))
+
+
+ # s()
+ # s(start,length)
+ # s(s0)
+ # s.add(start,length) : returns s
+ # s.remove(start,length)
+ # s.each() -> list of byte offsets, mostly for testing
+ # list(s) -> list of (start,length) tuples, one per span
+ # (start,length) in s -> True if (start..start+length-1) are all members
+ # NOT equivalent to x in list(s)
+ # s.len() -> number of bytes, for testing, bool(), and accounting/limiting
+ # bool(s) (__nonzeron__)
+ # s = s1+s2, s1-s2, +=s1, -=s1
+
+ def test_overlap(self):
+ for a in range(20):
+ for b in range(10):
+ for c in range(20):
+ for d in range(10):
+ self._test_overlap(a,b,c,d)
+
+ def _test_overlap(self, a, b, c, d):
+ s1 = set(range(a,a+b))
+ s2 = set(range(c,c+d))
+ #print "---"
+ #self._show_overlap(s1, "1")
+ #self._show_overlap(s2, "2")
+ o = overlap(a,b,c,d)
+ expected = s1.intersection(s2)
+ if not expected:
+ self.failUnlessEqual(o, None)
+ else:
+ start,length = o
+ so = set(range(start,start+length))
+ #self._show(so, "o")
+ self.failUnlessEqual(so, expected)
+
+ def _show_overlap(self, s, c):
+ import sys
+ out = sys.stdout
+ if s:
+ for i in range(max(s)):
+ if i in s:
+ out.write(c)
+ else:
+ out.write(" ")
+ out.write("\n")
+
+def extend(s, start, length, fill):
+ if len(s) >= start+length:
+ return s
+ assert len(fill) == 1
+ return s + fill*(start+length-len(s))
+
+def replace(s, start, data):
+ assert len(s) >= start+len(data)
+ return s[:start] + data + s[start+len(data):]
+
+class SimpleDataSpans:
+ def __init__(self, other=None):
+ self.missing = "" # "1" where missing, "0" where found
+ self.data = ""
+ if other:
+ for (start, data) in other.get_chunks():
+ self.add(start, data)
+
+ def __nonzero__(self): # this gets us bool()
+ return self.len()
+ def len(self):
+ return len(self.missing.replace("1", ""))
+ def _dump(self):
+ return [i for (i,c) in enumerate(self.missing) if c == "0"]
+ def _have(self, start, length):
+ m = self.missing[start:start+length]
+ if not m or len(m)<length or int(m):
+ return False
+ return True
+ def get_chunks(self):
+ for i in self._dump():
+ yield (i, self.data[i])
+ def get_spans(self):
+ return SimpleSpans([(start,len(data))
+ for (start,data) in self.get_chunks()])
+ def get(self, start, length):
+ if self._have(start, length):
+ return self.data[start:start+length]
+ return None
+ def pop(self, start, length):
+ data = self.get(start, length)
+ if data:
+ self.remove(start, length)
+ return data
+ def remove(self, start, length):
+ self.missing = replace(extend(self.missing, start, length, "1"),
+ start, "1"*length)
+ def add(self, start, data):
+ self.missing = replace(extend(self.missing, start, len(data), "1"),
+ start, "0"*len(data))
+ self.data = replace(extend(self.data, start, len(data), " "),
+ start, data)
+
+
+class StringSpans(unittest.TestCase):
+ def do_basic(self, klass):
+ ds = klass()
+ self.failUnlessEqual(ds.len(), 0)
+ self.failUnlessEqual(list(ds._dump()), [])
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 0)
+ s1 = ds.get_spans()
+ self.failUnlessEqual(ds.get(0, 4), None)
+ self.failUnlessEqual(ds.pop(0, 4), None)
+ ds.remove(0, 4)
+
+ ds.add(2, "four")
+ self.failUnlessEqual(ds.len(), 4)
+ self.failUnlessEqual(list(ds._dump()), [2,3,4,5])
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 4)
+ s1 = ds.get_spans()
+ self.failUnless((2,2) in s1)
+ self.failUnlessEqual(ds.get(0, 4), None)
+ self.failUnlessEqual(ds.pop(0, 4), None)
+ self.failUnlessEqual(ds.get(4, 4), None)
+
+ ds2 = klass(ds)
+ self.failUnlessEqual(ds2.len(), 4)
+ self.failUnlessEqual(list(ds2._dump()), [2,3,4,5])
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_chunks()]), 4)
+ self.failUnlessEqual(ds2.get(0, 4), None)
+ self.failUnlessEqual(ds2.pop(0, 4), None)
+ self.failUnlessEqual(ds2.pop(2, 3), "fou")
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_chunks()]), 1)
+ self.failUnlessEqual(ds2.get(2, 3), None)
+ self.failUnlessEqual(ds2.get(5, 1), "r")
+ self.failUnlessEqual(ds.get(2, 3), "fou")
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 4)
+
+ ds.add(0, "23")
+ self.failUnlessEqual(ds.len(), 6)
+ self.failUnlessEqual(list(ds._dump()), [0,1,2,3,4,5])
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 6)
+ self.failUnlessEqual(ds.get(0, 4), "23fo")
+ self.failUnlessEqual(ds.pop(0, 4), "23fo")
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 2)
+ self.failUnlessEqual(ds.get(0, 4), None)
+ self.failUnlessEqual(ds.pop(0, 4), None)
+
+ ds = klass()
+ ds.add(2, "four")
+ ds.add(3, "ea")
+ self.failUnlessEqual(ds.get(2, 4), "fear")
+
+ ds = klass()
+ ds.add(2L, "four")
+ ds.add(3L, "ea")
+ self.failUnlessEqual(ds.get(2L, 4L), "fear")
+
+
+ def do_scan(self, klass):
+ # do a test with gaps and spans of size 1 and 2
+ # left=(1,11) * right=(1,11) * gapsize=(1,2)
+ # 111, 112, 121, 122, 211, 212, 221, 222
+ # 211
+ # 121
+ # 112
+ # 212
+ # 222
+ # 221
+ # 111
+ # 122
+ # 11 1 1 11 11 11 1 1 111
+ # 0123456789012345678901234567
+ # abcdefghijklmnopqrstuvwxyz-=
+ pieces = [(1, "bc"),
+ (4, "e"),
+ (7, "h"),
+ (9, "jk"),
+ (12, "mn"),
+ (16, "qr"),
+ (20, "u"),
+ (22, "w"),
+ (25, "z-="),
+ ]
+ p_elements = set([1,2,4,7,9,10,12,13,16,17,20,22,25,26,27])
+ S = "abcdefghijklmnopqrstuvwxyz-="
+ # TODO: when adding data, add capital letters, to make sure we aren't
+ # just leaving the old data in place
+ l = len(S)
+ def base():
+ ds = klass()
+ for start, data in pieces:
+ ds.add(start, data)
+ return ds
+ def dump(s):
+ p = set(s._dump())
+ d = "".join([((i not in p) and " " or S[i]) for i in range(l)])
+ assert len(d) == l
+ return d
+ DEBUG = False
+ for start in range(0, l):
+ for end in range(start+1, l):
+ # add [start-end) to the baseline
+ which = "%d-%d" % (start, end-1)
+ p_added = set(range(start, end))
+ b = base()
+ if DEBUG:
+ print
+ print dump(b), which
+ add = klass(); add.add(start, S[start:end])
+ print dump(add)
+ b.add(start, S[start:end])
+ if DEBUG:
+ print dump(b)
+ # check that the new span is there
+ d = b.get(start, end-start)
+ self.failUnlessEqual(d, S[start:end], which)
+ # check that all the original pieces are still there
+ for t_start, t_data in pieces:
+ t_len = len(t_data)
+ self.failUnlessEqual(b.get(t_start, t_len),
+ S[t_start:t_start+t_len],
+ "%s %d+%d" % (which, t_start, t_len))
+ # check that a lot of subspans are mostly correct
+ for t_start in range(l):
+ for t_len in range(1,4):
+ d = b.get(t_start, t_len)
+ if d is not None:
+ which2 = "%s+(%d-%d)" % (which, t_start,
+ t_start+t_len-1)
+ self.failUnlessEqual(d, S[t_start:t_start+t_len],
+ which2)
+ # check that removing a subspan gives the right value
+ b2 = klass(b)
+ b2.remove(t_start, t_len)
+ removed = set(range(t_start, t_start+t_len))
+ for i in range(l):
+ exp = (((i in p_elements) or (i in p_added))
+ and (i not in removed))
+ which2 = "%s-(%d-%d)" % (which, t_start,
+ t_start+t_len-1)
+ self.failUnlessEqual(bool(b2.get(i, 1)), exp,
+ which2+" %d" % i)
+
+ def test_test(self):
+ self.do_basic(SimpleDataSpans)
+ self.do_scan(SimpleDataSpans)
+
+ def test_basic(self):
+ self.do_basic(DataSpans)
+ self.do_scan(DataSpans)
+
+ def test_random(self):
+ # attempt to increase coverage of corner cases by comparing behavior
+ # of a simple-but-slow model implementation against the
+ # complex-but-fast actual implementation, in a large number of random
+ # operations
+ S1 = SimpleDataSpans
+ S2 = DataSpans
+ s1 = S1(); s2 = S2()
+ seed = ""
+ def _randstr(length, seed):
+ created = 0
+ pieces = []
+ while created < length:
+ piece = _hash(seed + str(created)).hexdigest()
+ pieces.append(piece)
+ created += len(piece)
+ return "".join(pieces)[:length]
+ def _create(subseed):
+ ns1 = S1(); ns2 = S2()
+ for i in range(10):
+ what = _hash(subseed+str(i)).hexdigest()
+ start = int(what[2:4], 16)
+ length = max(1,int(what[5:6], 16))
+ ns1.add(start, _randstr(length, what[7:9]));
+ ns2.add(start, _randstr(length, what[7:9]))
+ return ns1, ns2
+
+ #print
+ for i in range(1000):
+ what = _hash(seed+str(i)).hexdigest()
+ op = what[0]
+ subop = what[1]
+ start = int(what[2:4], 16)
+ length = max(1,int(what[5:6], 16))
+ #print what
+ if op in "0":
+ if subop in "0123456":
+ s1 = S1(); s2 = S2()
+ else:
+ s1, s2 = _create(what[7:11])
+ #print "s2 = %s" % list(s2._dump())
+ elif op in "123456":
+ #print "s2.add(%d,%d)" % (start, length)
+ s1.add(start, _randstr(length, what[7:9]));
+ s2.add(start, _randstr(length, what[7:9]))
+ elif op in "789abc":
+ #print "s2.remove(%d,%d)" % (start, length)
+ s1.remove(start, length); s2.remove(start, length)
+ else:
+ #print "s2.pop(%d,%d)" % (start, length)
+ d1 = s1.pop(start, length); d2 = s2.pop(start, length)
+ self.failUnlessEqual(d1, d2)
+ #print "s1 now %s" % list(s1._dump())
+ #print "s2 now %s" % list(s2._dump())
+ self.failUnlessEqual(s1.len(), s2.len())
+ self.failUnlessEqual(list(s1._dump()), list(s2._dump()))
+ for j in range(100):
+ what = _hash(what[12:14]+str(j)).hexdigest()
+ start = int(what[2:4], 16)
+ length = max(1, int(what[5:6], 16))
+ d1 = s1.get(start, length); d2 = s2.get(start, length)
+ self.failUnlessEqual(d1, d2, "%d+%d" % (start, length))