def foo(): pass # keep the line number constant
-import os, time
+import os, time, sys
from StringIO import StringIO
from twisted.trial import unittest
from twisted.internet import defer, reactor
from twisted.python.failure import Failure
from twisted.python import log
+from pycryptopp.hash.sha256 import SHA256 as _hash
from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil
from allmydata.util import assertutil, fileutil, deferredutil, abbreviate
from allmydata.util import limiter, time_format, pollmixin, cachedir
from allmydata.util import statistics, dictutil, pipeline
+from allmydata.util import log as tahoe_log
+from allmydata.util.spans import Spans, overlap, DataSpans
class Base32(unittest.TestCase):
def test_b2a_matches_Pythons(self):
try:
func(*args, **kwargs)
self.fail(msg)
- except AssertionError, e:
+ except AssertionError:
pass
def failUnlessListEqual(self, a, b, msg = None):
self.fail("assert was not caught")
def should_not_assert(self, func, *args, **kwargs):
- if "re" in kwargs:
- regexp = kwargs["re"]
- del kwargs["re"]
try:
func(*args, **kwargs)
except AssertionError, e:
fileutil.rm_dir(basedir)
fileutil.remove_if_possible(fn) # should survive errors
+ def test_write_atomically(self):
+ basedir = "util/FileUtil/test_write_atomically"
+ fileutil.make_dirs(basedir)
+ fn = os.path.join(basedir, "here")
+ fileutil.write_atomically(fn, "one")
+ self.failUnlessEqual(fileutil.read(fn), "one")
+ fileutil.write_atomically(fn, "two", mode="") # non-binary
+ self.failUnlessEqual(fileutil.read(fn), "two")
+
def test_open_or_create(self):
basedir = "util/FileUtil/test_open_or_create"
fileutil.make_dirs(basedir)
f.write("stuff.")
f.close()
f = fileutil.open_or_create(fn)
- f.seek(0, 2)
+ f.seek(0, os.SEEK_END)
f.write("more.")
f.close()
f = open(fn, "r")
used = fileutil.du(basedir)
self.failUnlessEqual(10+11+12+13, used)
+ def test_abspath_expanduser_unicode(self):
+ self.failUnlessRaises(AssertionError, fileutil.abspath_expanduser_unicode, "bytestring")
+
+ saved_cwd = os.path.normpath(os.getcwdu())
+ abspath_cwd = fileutil.abspath_expanduser_unicode(u".")
+ self.failUnless(isinstance(saved_cwd, unicode), saved_cwd)
+ self.failUnless(isinstance(abspath_cwd, unicode), abspath_cwd)
+ self.failUnlessEqual(abspath_cwd, saved_cwd)
+
+ # adapted from <http://svn.python.org/view/python/branches/release26-maint/Lib/test/test_posixpath.py?view=markup&pathrev=78279#test_abspath>
+
+ self.failUnlessIn(u"foo", fileutil.abspath_expanduser_unicode(u"foo"))
+ self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~"))
+
+ cwds = ['cwd']
+ try:
+ cwds.append(u'\xe7w\xf0'.encode(sys.getfilesystemencoding()
+ or 'ascii'))
+ except UnicodeEncodeError:
+ pass # the cwd can't be encoded -- test with ascii cwd only
+
+ for cwd in cwds:
+ try:
+ os.mkdir(cwd)
+ os.chdir(cwd)
+ for upath in (u'', u'fuu', u'f\xf9\xf9', u'/fuu', u'U:\\', u'~'):
+ uabspath = fileutil.abspath_expanduser_unicode(upath)
+ self.failUnless(isinstance(uabspath, unicode), uabspath)
+ finally:
+ os.chdir(saved_cwd)
+
+ def test_disk_stats(self):
+ avail = fileutil.get_available_space('.', 2**14)
+ if avail == 0:
+ raise unittest.SkipTest("This test will spuriously fail there is no disk space left.")
+
+ disk = fileutil.get_disk_stats('.', 2**13)
+ self.failUnless(disk['total'] > 0, disk['total'])
+ self.failUnless(disk['used'] > 0, disk['used'])
+ self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
+ self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
+ self.failUnless(disk['avail'] > 0, disk['avail'])
+
+ def test_disk_stats_avail_nonnegative(self):
+ # This test will spuriously fail if you have more than 2^128
+ # bytes of available space on your filesystem.
+ disk = fileutil.get_disk_stats('.', 2**128)
+ self.failUnlessEqual(disk['avail'], 0)
+
class PollMixinTests(unittest.TestCase):
def setUp(self):
self.pm = pollmixin.PollMixin()
self.failUnlessEqual(p("10MiB"), 10*1024*1024)
self.failUnlessEqual(p("5G"), 5*1000*1000*1000)
self.failUnlessEqual(p("4GiB"), 4*1024*1024*1024)
- e = self.failUnlessRaises(ValueError, p, "12 cubits")
- self.failUnless("12 cubits" in str(e))
+ self.failUnlessEqual(p("3TB"), 3*1000*1000*1000*1000)
+ self.failUnlessEqual(p("3TiB"), 3*1024*1024*1024*1024)
+ self.failUnlessEqual(p("6PB"), 6*1000*1000*1000*1000*1000)
+ self.failUnlessEqual(p("6PiB"), 6*1024*1024*1024*1024*1024)
+ self.failUnlessEqual(p("9EB"), 9*1000*1000*1000*1000*1000*1000)
+ self.failUnlessEqual(p("9EiB"), 9*1024*1024*1024*1024*1024*1024)
+ e = self.failUnlessRaises(ValueError, p, "12 cubits")
+ self.failUnlessIn("12 cubits", str(e))
+ e = self.failUnlessRaises(ValueError, p, "1 BB")
+ self.failUnlessIn("1 BB", str(e))
+ e = self.failUnlessRaises(ValueError, p, "fhtagn")
+ self.failUnlessIn("fhtagn", str(e))
+
class Limiter(unittest.TestCase):
timeout = 480 # This takes longer than 240 seconds on Francois's arm box.
self.failUnlessEqual(thatmomentinmarch, 1237585742.226536)
self.failUnlessEqual(origtzname, time.tzname)
+ def test_iso_utc(self):
+ when = 1266760143.7841301
+ out = time_format.iso_utc_date(when)
+ self.failUnlessEqual(out, "2010-02-21")
+ out = time_format.iso_utc_date(t=lambda: when)
+ self.failUnlessEqual(out, "2010-02-21")
+ out = time_format.iso_utc(when)
+ self.failUnlessEqual(out, "2010-02-21_13:49:03.784130")
+ out = time_format.iso_utc(when, sep="-")
+ self.failUnlessEqual(out, "2010-02-21-13:49:03.784130")
+
+ def test_parse_duration(self):
+ p = time_format.parse_duration
+ DAY = 24*60*60
+ self.failUnlessEqual(p("1 day"), DAY)
+ self.failUnlessEqual(p("2 days"), 2*DAY)
+ self.failUnlessEqual(p("3 months"), 3*31*DAY)
+ self.failUnlessEqual(p("4 mo"), 4*31*DAY)
+ self.failUnlessEqual(p("5 years"), 5*365*DAY)
+ e = self.failUnlessRaises(ValueError, p, "123")
+ self.failUnlessIn("no unit (like day, month, or year) in '123'",
+ str(e))
+
+ def test_parse_date(self):
+ self.failUnlessEqual(time_format.parse_date("2010-02-21"), 1266710400)
+
class CacheDir(unittest.TestCase):
def test_basic(self):
basedir = "test_util/CacheDir/test_basic"
_failIfExists("a")
_failUnlessExists("b")
_failUnlessExists("c")
+ del b2
ctr = [0]
class EqButNotIs:
ds.discard(2, "c")
self.failIf(2 in ds)
- ds.union(1, ["a", "e"])
- ds.union(3, ["f"])
- self.failUnlessEqual(ds[1], set(["a","e"]))
- self.failUnlessEqual(ds[3], set(["f"]))
+ ds.add(3, "f")
ds2 = dictutil.DictOfSets()
ds2.add(3, "f")
ds2.add(3, "g")
ds2.add(4, "h")
ds.update(ds2)
- self.failUnlessEqual(ds[1], set(["a","e"]))
+ self.failUnlessEqual(ds[1], set(["a"]))
self.failUnlessEqual(ds[3], set(["f", "g"]))
self.failUnlessEqual(ds[4], set(["h"]))
self.calls[1][0].callback("two-result")
self.calls[2][0].errback(ValueError("three-error"))
+
+ del d1,d2,d3,d4
+
+class SampleError(Exception):
+ pass
+
+class Log(unittest.TestCase):
+ def test_err(self):
+ if not hasattr(self, "flushLoggedErrors"):
+ # without flushLoggedErrors, we can't get rid of the
+ # twisted.log.err that tahoe_log records, so we can't keep this
+ # test from [ERROR]ing
+ raise unittest.SkipTest("needs flushLoggedErrors from Twisted-2.5.0")
+ try:
+ raise SampleError("simple sample")
+ except:
+ f = Failure()
+ tahoe_log.err(format="intentional sample error",
+ failure=f, level=tahoe_log.OPERATIONAL, umid="wO9UoQ")
+ self.flushLoggedErrors(SampleError)
+
+
+class SimpleSpans:
+ # this is a simple+inefficient form of util.spans.Spans . We compare the
+ # behavior of this reference model against the real (efficient) form.
+
+ def __init__(self, _span_or_start=None, length=None):
+ self._have = set()
+ if length is not None:
+ for i in range(_span_or_start, _span_or_start+length):
+ self._have.add(i)
+ elif _span_or_start:
+ for (start,length) in _span_or_start:
+ self.add(start, length)
+
+ def add(self, start, length):
+ for i in range(start, start+length):
+ self._have.add(i)
+ return self
+
+ def remove(self, start, length):
+ for i in range(start, start+length):
+ self._have.discard(i)
+ return self
+
+ def each(self):
+ return sorted(self._have)
+
+ def __iter__(self):
+ items = sorted(self._have)
+ prevstart = None
+ prevend = None
+ for i in items:
+ if prevstart is None:
+ prevstart = prevend = i
+ continue
+ if i == prevend+1:
+ prevend = i
+ continue
+ yield (prevstart, prevend-prevstart+1)
+ prevstart = prevend = i
+ if prevstart is not None:
+ yield (prevstart, prevend-prevstart+1)
+
+ def __nonzero__(self): # this gets us bool()
+ return self.len()
+
+ def len(self):
+ return len(self._have)
+
+ def __add__(self, other):
+ s = self.__class__(self)
+ for (start, length) in other:
+ s.add(start, length)
+ return s
+
+ def __sub__(self, other):
+ s = self.__class__(self)
+ for (start, length) in other:
+ s.remove(start, length)
+ return s
+
+ def __iadd__(self, other):
+ for (start, length) in other:
+ self.add(start, length)
+ return self
+
+ def __isub__(self, other):
+ for (start, length) in other:
+ self.remove(start, length)
+ return self
+
+ def __and__(self, other):
+ s = self.__class__()
+ for i in other.each():
+ if i in self._have:
+ s.add(i, 1)
+ return s
+
+ def __contains__(self, (start,length)):
+ for i in range(start, start+length):
+ if i not in self._have:
+ return False
+ return True
+
+class ByteSpans(unittest.TestCase):
+ def test_basic(self):
+ s = Spans()
+ self.failUnlessEqual(list(s), [])
+ self.failIf(s)
+ self.failIf((0,1) in s)
+ self.failUnlessEqual(s.len(), 0)
+
+ s1 = Spans(3, 4) # 3,4,5,6
+ self._check1(s1)
+
+ s1 = Spans(3L, 4L) # 3,4,5,6
+ self._check1(s1)
+
+ s2 = Spans(s1)
+ self._check1(s2)
+
+ s2.add(10,2) # 10,11
+ self._check1(s1)
+ self.failUnless((10,1) in s2)
+ self.failIf((10,1) in s1)
+ self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11])
+ self.failUnlessEqual(s2.len(), 6)
+
+ s2.add(15,2).add(20,2)
+ self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11,15,16,20,21])
+ self.failUnlessEqual(s2.len(), 10)
+
+ s2.remove(4,3).remove(15,1)
+ self.failUnlessEqual(list(s2.each()), [3,10,11,16,20,21])
+ self.failUnlessEqual(s2.len(), 6)
+
+ s1 = SimpleSpans(3, 4) # 3 4 5 6
+ s2 = SimpleSpans(5, 4) # 5 6 7 8
+ i = s1 & s2
+ self.failUnlessEqual(list(i.each()), [5, 6])
+
+ def _check1(self, s):
+ self.failUnlessEqual(list(s), [(3,4)])
+ self.failUnless(s)
+ self.failUnlessEqual(s.len(), 4)
+ self.failIf((0,1) in s)
+ self.failUnless((3,4) in s)
+ self.failUnless((3,1) in s)
+ self.failUnless((5,2) in s)
+ self.failUnless((6,1) in s)
+ self.failIf((6,2) in s)
+ self.failIf((7,1) in s)
+ self.failUnlessEqual(list(s.each()), [3,4,5,6])
+
+ def test_large(self):
+ s = Spans(4, 2**65) # don't do this with a SimpleSpans
+ self.failUnlessEqual(list(s), [(4, 2**65)])
+ self.failUnless(s)
+ self.failUnlessEqual(s.len(), 2**65)
+ self.failIf((0,1) in s)
+ self.failUnless((4,2) in s)
+ self.failUnless((2**65,2) in s)
+
+ def test_math(self):
+ s1 = Spans(0, 10) # 0,1,2,3,4,5,6,7,8,9
+ s2 = Spans(5, 3) # 5,6,7
+ s3 = Spans(8, 4) # 8,9,10,11
+
+ s = s1 - s2
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9])
+ s = s1 - s3
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7])
+ s = s2 - s3
+ self.failUnlessEqual(list(s.each()), [5,6,7])
+ s = s1 & s2
+ self.failUnlessEqual(list(s.each()), [5,6,7])
+ s = s2 & s1
+ self.failUnlessEqual(list(s.each()), [5,6,7])
+ s = s1 & s3
+ self.failUnlessEqual(list(s.each()), [8,9])
+ s = s3 & s1
+ self.failUnlessEqual(list(s.each()), [8,9])
+ s = s2 & s3
+ self.failUnlessEqual(list(s.each()), [])
+ s = s3 & s2
+ self.failUnlessEqual(list(s.each()), [])
+ s = Spans() & s3
+ self.failUnlessEqual(list(s.each()), [])
+ s = s3 & Spans()
+ self.failUnlessEqual(list(s.each()), [])
+
+ s = s1 + s2
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9])
+ s = s1 + s3
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11])
+ s = s2 + s3
+ self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11])
+
+ s = Spans(s1)
+ s -= s2
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9])
+ s = Spans(s1)
+ s -= s3
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7])
+ s = Spans(s2)
+ s -= s3
+ self.failUnlessEqual(list(s.each()), [5,6,7])
+
+ s = Spans(s1)
+ s += s2
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9])
+ s = Spans(s1)
+ s += s3
+ self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11])
+ s = Spans(s2)
+ s += s3
+ self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11])
+
+ def test_random(self):
+ # attempt to increase coverage of corner cases by comparing behavior
+ # of a simple-but-slow model implementation against the
+ # complex-but-fast actual implementation, in a large number of random
+ # operations
+ S1 = SimpleSpans
+ S2 = Spans
+ s1 = S1(); s2 = S2()
+ seed = ""
+ def _create(subseed):
+ ns1 = S1(); ns2 = S2()
+ for i in range(10):
+ what = _hash(subseed+str(i)).hexdigest()
+ start = int(what[2:4], 16)
+ length = max(1,int(what[5:6], 16))
+ ns1.add(start, length); ns2.add(start, length)
+ return ns1, ns2
+
+ #print
+ for i in range(1000):
+ what = _hash(seed+str(i)).hexdigest()
+ op = what[0]
+ subop = what[1]
+ start = int(what[2:4], 16)
+ length = max(1,int(what[5:6], 16))
+ #print what
+ if op in "0":
+ if subop in "01234":
+ s1 = S1(); s2 = S2()
+ elif subop in "5678":
+ s1 = S1(start, length); s2 = S2(start, length)
+ else:
+ s1 = S1(s1); s2 = S2(s2)
+ #print "s2 = %s" % s2.dump()
+ elif op in "123":
+ #print "s2.add(%d,%d)" % (start, length)
+ s1.add(start, length); s2.add(start, length)
+ elif op in "456":
+ #print "s2.remove(%d,%d)" % (start, length)
+ s1.remove(start, length); s2.remove(start, length)
+ elif op in "78":
+ ns1, ns2 = _create(what[7:11])
+ #print "s2 + %s" % ns2.dump()
+ s1 = s1 + ns1; s2 = s2 + ns2
+ elif op in "9a":
+ ns1, ns2 = _create(what[7:11])
+ #print "%s - %s" % (s2.dump(), ns2.dump())
+ s1 = s1 - ns1; s2 = s2 - ns2
+ elif op in "bc":
+ ns1, ns2 = _create(what[7:11])
+ #print "s2 += %s" % ns2.dump()
+ s1 += ns1; s2 += ns2
+ elif op in "de":
+ ns1, ns2 = _create(what[7:11])
+ #print "%s -= %s" % (s2.dump(), ns2.dump())
+ s1 -= ns1; s2 -= ns2
+ else:
+ ns1, ns2 = _create(what[7:11])
+ #print "%s &= %s" % (s2.dump(), ns2.dump())
+ s1 = s1 & ns1; s2 = s2 & ns2
+ #print "s2 now %s" % s2.dump()
+ self.failUnlessEqual(list(s1.each()), list(s2.each()))
+ self.failUnlessEqual(s1.len(), s2.len())
+ self.failUnlessEqual(bool(s1), bool(s2))
+ self.failUnlessEqual(list(s1), list(s2))
+ for j in range(10):
+ what = _hash(what[12:14]+str(j)).hexdigest()
+ start = int(what[2:4], 16)
+ length = max(1, int(what[5:6], 16))
+ span = (start, length)
+ self.failUnlessEqual(bool(span in s1), bool(span in s2))
+
+
+ # s()
+ # s(start,length)
+ # s(s0)
+ # s.add(start,length) : returns s
+ # s.remove(start,length)
+ # s.each() -> list of byte offsets, mostly for testing
+ # list(s) -> list of (start,length) tuples, one per span
+ # (start,length) in s -> True if (start..start+length-1) are all members
+ # NOT equivalent to x in list(s)
+ # s.len() -> number of bytes, for testing, bool(), and accounting/limiting
+ # bool(s) (__nonzeron__)
+ # s = s1+s2, s1-s2, +=s1, -=s1
+
+ def test_overlap(self):
+ for a in range(20):
+ for b in range(10):
+ for c in range(20):
+ for d in range(10):
+ self._test_overlap(a,b,c,d)
+
+ def _test_overlap(self, a, b, c, d):
+ s1 = set(range(a,a+b))
+ s2 = set(range(c,c+d))
+ #print "---"
+ #self._show_overlap(s1, "1")
+ #self._show_overlap(s2, "2")
+ o = overlap(a,b,c,d)
+ expected = s1.intersection(s2)
+ if not expected:
+ self.failUnlessEqual(o, None)
+ else:
+ start,length = o
+ so = set(range(start,start+length))
+ #self._show(so, "o")
+ self.failUnlessEqual(so, expected)
+
+ def _show_overlap(self, s, c):
+ import sys
+ out = sys.stdout
+ if s:
+ for i in range(max(s)):
+ if i in s:
+ out.write(c)
+ else:
+ out.write(" ")
+ out.write("\n")
+
+def extend(s, start, length, fill):
+ if len(s) >= start+length:
+ return s
+ assert len(fill) == 1
+ return s + fill*(start+length-len(s))
+
+def replace(s, start, data):
+ assert len(s) >= start+len(data)
+ return s[:start] + data + s[start+len(data):]
+
+class SimpleDataSpans:
+ def __init__(self, other=None):
+ self.missing = "" # "1" where missing, "0" where found
+ self.data = ""
+ if other:
+ for (start, data) in other.get_chunks():
+ self.add(start, data)
+
+ def __nonzero__(self): # this gets us bool()
+ return self.len()
+ def len(self):
+ return len(self.missing.replace("1", ""))
+ def _dump(self):
+ return [i for (i,c) in enumerate(self.missing) if c == "0"]
+ def _have(self, start, length):
+ m = self.missing[start:start+length]
+ if not m or len(m)<length or int(m):
+ return False
+ return True
+ def get_chunks(self):
+ for i in self._dump():
+ yield (i, self.data[i])
+ def get_spans(self):
+ return SimpleSpans([(start,len(data))
+ for (start,data) in self.get_chunks()])
+ def get(self, start, length):
+ if self._have(start, length):
+ return self.data[start:start+length]
+ return None
+ def pop(self, start, length):
+ data = self.get(start, length)
+ if data:
+ self.remove(start, length)
+ return data
+ def remove(self, start, length):
+ self.missing = replace(extend(self.missing, start, length, "1"),
+ start, "1"*length)
+ def add(self, start, data):
+ self.missing = replace(extend(self.missing, start, len(data), "1"),
+ start, "0"*len(data))
+ self.data = replace(extend(self.data, start, len(data), " "),
+ start, data)
+
+
+class StringSpans(unittest.TestCase):
+ def do_basic(self, klass):
+ ds = klass()
+ self.failUnlessEqual(ds.len(), 0)
+ self.failUnlessEqual(list(ds._dump()), [])
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 0)
+ s = ds.get_spans()
+ self.failUnlessEqual(ds.get(0, 4), None)
+ self.failUnlessEqual(ds.pop(0, 4), None)
+ ds.remove(0, 4)
+
+ ds.add(2, "four")
+ self.failUnlessEqual(ds.len(), 4)
+ self.failUnlessEqual(list(ds._dump()), [2,3,4,5])
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 4)
+ s = ds.get_spans()
+ self.failUnless((2,2) in s)
+ self.failUnlessEqual(ds.get(0, 4), None)
+ self.failUnlessEqual(ds.pop(0, 4), None)
+ self.failUnlessEqual(ds.get(4, 4), None)
+
+ ds2 = klass(ds)
+ self.failUnlessEqual(ds2.len(), 4)
+ self.failUnlessEqual(list(ds2._dump()), [2,3,4,5])
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_chunks()]), 4)
+ self.failUnlessEqual(ds2.get(0, 4), None)
+ self.failUnlessEqual(ds2.pop(0, 4), None)
+ self.failUnlessEqual(ds2.pop(2, 3), "fou")
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_chunks()]), 1)
+ self.failUnlessEqual(ds2.get(2, 3), None)
+ self.failUnlessEqual(ds2.get(5, 1), "r")
+ self.failUnlessEqual(ds.get(2, 3), "fou")
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 4)
+
+ ds.add(0, "23")
+ self.failUnlessEqual(ds.len(), 6)
+ self.failUnlessEqual(list(ds._dump()), [0,1,2,3,4,5])
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 6)
+ self.failUnlessEqual(ds.get(0, 4), "23fo")
+ self.failUnlessEqual(ds.pop(0, 4), "23fo")
+ self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 2)
+ self.failUnlessEqual(ds.get(0, 4), None)
+ self.failUnlessEqual(ds.pop(0, 4), None)
+
+ ds = klass()
+ ds.add(2, "four")
+ ds.add(3, "ea")
+ self.failUnlessEqual(ds.get(2, 4), "fear")
+
+ ds = klass()
+ ds.add(2L, "four")
+ ds.add(3L, "ea")
+ self.failUnlessEqual(ds.get(2L, 4L), "fear")
+
+
+ def do_scan(self, klass):
+ # do a test with gaps and spans of size 1 and 2
+ # left=(1,11) * right=(1,11) * gapsize=(1,2)
+ # 111, 112, 121, 122, 211, 212, 221, 222
+ # 211
+ # 121
+ # 112
+ # 212
+ # 222
+ # 221
+ # 111
+ # 122
+ # 11 1 1 11 11 11 1 1 111
+ # 0123456789012345678901234567
+ # abcdefghijklmnopqrstuvwxyz-=
+ pieces = [(1, "bc"),
+ (4, "e"),
+ (7, "h"),
+ (9, "jk"),
+ (12, "mn"),
+ (16, "qr"),
+ (20, "u"),
+ (22, "w"),
+ (25, "z-="),
+ ]
+ p_elements = set([1,2,4,7,9,10,12,13,16,17,20,22,25,26,27])
+ S = "abcdefghijklmnopqrstuvwxyz-="
+ # TODO: when adding data, add capital letters, to make sure we aren't
+ # just leaving the old data in place
+ l = len(S)
+ def base():
+ ds = klass()
+ for start, data in pieces:
+ ds.add(start, data)
+ return ds
+ def dump(s):
+ p = set(s._dump())
+ d = "".join([((i not in p) and " " or S[i]) for i in range(l)])
+ assert len(d) == l
+ return d
+ DEBUG = False
+ for start in range(0, l):
+ for end in range(start+1, l):
+ # add [start-end) to the baseline
+ which = "%d-%d" % (start, end-1)
+ p_added = set(range(start, end))
+ b = base()
+ if DEBUG:
+ print
+ print dump(b), which
+ add = klass(); add.add(start, S[start:end])
+ print dump(add)
+ b.add(start, S[start:end])
+ if DEBUG:
+ print dump(b)
+ # check that the new span is there
+ d = b.get(start, end-start)
+ self.failUnlessEqual(d, S[start:end], which)
+ # check that all the original pieces are still there
+ for t_start, t_data in pieces:
+ t_len = len(t_data)
+ self.failUnlessEqual(b.get(t_start, t_len),
+ S[t_start:t_start+t_len],
+ "%s %d+%d" % (which, t_start, t_len))
+ # check that a lot of subspans are mostly correct
+ for t_start in range(l):
+ for t_len in range(1,4):
+ d = b.get(t_start, t_len)
+ if d is not None:
+ which2 = "%s+(%d-%d)" % (which, t_start,
+ t_start+t_len-1)
+ self.failUnlessEqual(d, S[t_start:t_start+t_len],
+ which2)
+ # check that removing a subspan gives the right value
+ b2 = klass(b)
+ b2.remove(t_start, t_len)
+ removed = set(range(t_start, t_start+t_len))
+ for i in range(l):
+ exp = (((i in p_elements) or (i in p_added))
+ and (i not in removed))
+ which2 = "%s-(%d-%d)" % (which, t_start,
+ t_start+t_len-1)
+ self.failUnlessEqual(bool(b2.get(i, 1)), exp,
+ which2+" %d" % i)
+
+ def test_test(self):
+ self.do_basic(SimpleDataSpans)
+ self.do_scan(SimpleDataSpans)
+
+ def test_basic(self):
+ self.do_basic(DataSpans)
+ self.do_scan(DataSpans)
+
+ def test_random(self):
+ # attempt to increase coverage of corner cases by comparing behavior
+ # of a simple-but-slow model implementation against the
+ # complex-but-fast actual implementation, in a large number of random
+ # operations
+ S1 = SimpleDataSpans
+ S2 = DataSpans
+ s1 = S1(); s2 = S2()
+ seed = ""
+ def _randstr(length, seed):
+ created = 0
+ pieces = []
+ while created < length:
+ piece = _hash(seed + str(created)).hexdigest()
+ pieces.append(piece)
+ created += len(piece)
+ return "".join(pieces)[:length]
+ def _create(subseed):
+ ns1 = S1(); ns2 = S2()
+ for i in range(10):
+ what = _hash(subseed+str(i)).hexdigest()
+ start = int(what[2:4], 16)
+ length = max(1,int(what[5:6], 16))
+ ns1.add(start, _randstr(length, what[7:9]));
+ ns2.add(start, _randstr(length, what[7:9]))
+ return ns1, ns2
+
+ #print
+ for i in range(1000):
+ what = _hash(seed+str(i)).hexdigest()
+ op = what[0]
+ subop = what[1]
+ start = int(what[2:4], 16)
+ length = max(1,int(what[5:6], 16))
+ #print what
+ if op in "0":
+ if subop in "0123456":
+ s1 = S1(); s2 = S2()
+ else:
+ s1, s2 = _create(what[7:11])
+ #print "s2 = %s" % list(s2._dump())
+ elif op in "123456":
+ #print "s2.add(%d,%d)" % (start, length)
+ s1.add(start, _randstr(length, what[7:9]));
+ s2.add(start, _randstr(length, what[7:9]))
+ elif op in "789abc":
+ #print "s2.remove(%d,%d)" % (start, length)
+ s1.remove(start, length); s2.remove(start, length)
+ else:
+ #print "s2.pop(%d,%d)" % (start, length)
+ d1 = s1.pop(start, length); d2 = s2.pop(start, length)
+ self.failUnlessEqual(d1, d2)
+ #print "s1 now %s" % list(s1._dump())
+ #print "s2 now %s" % list(s2._dump())
+ self.failUnlessEqual(s1.len(), s2.len())
+ self.failUnlessEqual(list(s1._dump()), list(s2._dump()))
+ for j in range(100):
+ what = _hash(what[12:14]+str(j)).hexdigest()
+ start = int(what[2:4], 16)
+ length = max(1, int(what[5:6], 16))
+ d1 = s1.get(start, length); d2 = s2.get(start, length)
+ self.failUnlessEqual(d1, d2, "%d+%d" % (start, length))