from twisted.internet import defer, reactor
from twisted.python.failure import Failure
from twisted.python import log
-from hashlib import md5
+from pycryptopp.hash.sha256 import SHA256 as _hash
from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil
from allmydata.util import assertutil, fileutil, deferredutil, abbreviate
fileutil.rm_dir(basedir)
fileutil.remove_if_possible(fn) # should survive errors
+ def test_write_atomically(self):
+ basedir = "util/FileUtil/test_write_atomically"
+ fileutil.make_dirs(basedir)
+ fn = os.path.join(basedir, "here")
+ fileutil.write_atomically(fn, "one")
+ self.failUnlessEqual(fileutil.read(fn), "one")
+ fileutil.write_atomically(fn, "two", mode="") # non-binary
+ self.failUnlessEqual(fileutil.read(fn), "two")
+
def test_open_or_create(self):
basedir = "util/FileUtil/test_open_or_create"
fileutil.make_dirs(basedir)
f.write("stuff.")
f.close()
f = fileutil.open_or_create(fn)
- f.seek(0, 2)
+ f.seek(0, os.SEEK_END)
f.write("more.")
f.close()
f = open(fn, "r")
finally:
os.chdir(saved_cwd)
+ def test_disk_stats(self):
+ avail = fileutil.get_available_space('.', 2**14)
+ if avail == 0:
+ raise unittest.SkipTest("This test will spuriously fail there is no disk space left.")
+
+ disk = fileutil.get_disk_stats('.', 2**13)
+ self.failUnless(disk['total'] > 0, disk['total'])
+ # we tolerate used==0 for a Travis-CI bug, see #2290
+ self.failUnless(disk['used'] >= 0, disk['used'])
+ self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
+ self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
+ self.failUnless(disk['avail'] > 0, disk['avail'])
+
+ def test_disk_stats_avail_nonnegative(self):
+ # This test will spuriously fail if you have more than 2^128
+ # bytes of available space on your filesystem.
+ disk = fileutil.get_disk_stats('.', 2**128)
+ self.failUnlessEqual(disk['avail'], 0)
+
class PollMixinTests(unittest.TestCase):
def setUp(self):
self.pm = pollmixin.PollMixin()
h2.update("foo")
self.failUnlessEqual(h1, h2.digest())
- def test_constant_time_compare(self):
- self.failUnless(hashutil.constant_time_compare("a", "a"))
- self.failUnless(hashutil.constant_time_compare("ab", "ab"))
- self.failIf(hashutil.constant_time_compare("a", "b"))
- self.failIf(hashutil.constant_time_compare("a", "aa"))
+ def test_timing_safe_compare(self):
+ self.failUnless(hashutil.timing_safe_compare("a", "a"))
+ self.failUnless(hashutil.timing_safe_compare("ab", "ab"))
+ self.failIf(hashutil.timing_safe_compare("a", "b"))
+ self.failIf(hashutil.timing_safe_compare("a", "aa"))
def _testknown(self, hashf, expected_a, *args):
got = hashf(*args)
(1000*1000*1000, "1.00 GB"),
(1000*1000*1000*1000, "1.00 TB"),
(1000*1000*1000*1000*1000, "1.00 PB"),
- (1234567890123456, "1.23 PB"),
+ (1000*1000*1000*1000*1000*1000, "1.00 EB"),
+ (1234567890123456789, "1.23 EB"),
]
for (x, expected) in tests_si:
got = abbreviate.abbreviate_space(x, SI=True)
(1024*1024*1024*1024, "1.00 TiB"),
(1000*1000*1000*1000*1000, "909.49 TiB"),
(1024*1024*1024*1024*1024, "1.00 PiB"),
- (1234567890123456, "1.10 PiB"),
+ (1024*1024*1024*1024*1024*1024, "1.00 EiB"),
+ (1234567890123456789, "1.07 EiB"),
]
for (x, expected) in tests_base1024:
got = abbreviate.abbreviate_space(x, SI=False)
self.failUnlessEqual(p("10MiB"), 10*1024*1024)
self.failUnlessEqual(p("5G"), 5*1000*1000*1000)
self.failUnlessEqual(p("4GiB"), 4*1024*1024*1024)
+ self.failUnlessEqual(p("3TB"), 3*1000*1000*1000*1000)
+ self.failUnlessEqual(p("3TiB"), 3*1024*1024*1024*1024)
+ self.failUnlessEqual(p("6PB"), 6*1000*1000*1000*1000*1000)
+ self.failUnlessEqual(p("6PiB"), 6*1024*1024*1024*1024*1024)
+ self.failUnlessEqual(p("9EB"), 9*1000*1000*1000*1000*1000*1000)
+ self.failUnlessEqual(p("9EiB"), 9*1024*1024*1024*1024*1024*1024)
+
e = self.failUnlessRaises(ValueError, p, "12 cubits")
- self.failUnless("12 cubits" in str(e))
+ self.failUnlessIn("12 cubits", str(e))
+ e = self.failUnlessRaises(ValueError, p, "1 BB")
+ self.failUnlessIn("1 BB", str(e))
+ e = self.failUnlessRaises(ValueError, p, "fhtagn")
+ self.failUnlessIn("fhtagn", str(e))
class Limiter(unittest.TestCase):
timeout = 480 # This takes longer than 240 seconds on Francois's arm box.
def test_parse_date(self):
self.failUnlessEqual(time_format.parse_date("2010-02-21"), 1266710400)
+ def test_format_delta(self):
+ time_1 = 1389812723
+ time_5s_delta = 1389812728
+ time_28m7s_delta = 1389814410
+ time_1h_delta = 1389816323
+ time_1d21h46m49s_delta = 1389977532
+ TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
+ time_1_isostr = time.strftime(TIME_FORMAT, time.localtime(time_1))
+
+ self.failUnlessEqual(
+ time_format.format_delta(time_1, time_5s_delta),
+ (time_1_isostr, '5s'))
+ self.failUnlessEqual(
+ time_format.format_delta(time_1, time_28m7s_delta),
+ (time_1_isostr, '28m7s'))
+ self.failUnlessEqual(
+ time_format.format_delta(time_1, time_1h_delta),
+ (time_1_isostr, '1h0m0s'))
+ self.failUnlessEqual(
+ time_format.format_delta(time_1, time_1d21h46m49s_delta),
+ (time_1_isostr, '1d21h46m49s'))
+
+ # time_1 with a decimal fraction will make the delta 1s less
+ time_1decimal = 1389812723.383963
+
+ self.failUnlessEqual(
+ time_format.format_delta(time_1decimal, time_5s_delta),
+ (time_1_isostr, '4s'))
+ self.failUnlessEqual(
+ time_format.format_delta(time_1decimal, time_28m7s_delta),
+ (time_1_isostr, '28m6s'))
+ self.failUnlessEqual(
+ time_format.format_delta(time_1decimal, time_1h_delta),
+ (time_1_isostr, '59m59s'))
+ self.failUnlessEqual(
+ time_format.format_delta(time_1decimal, time_1d21h46m49s_delta),
+ (time_1_isostr, '1d21h46m48s'))
+
class CacheDir(unittest.TestCase):
def test_basic(self):
basedir = "test_util/CacheDir/test_basic"
ds.discard(2, "c")
self.failIf(2 in ds)
- ds.union(1, ["a", "e"])
- ds.union(3, ["f"])
- self.failUnlessEqual(ds[1], set(["a","e"]))
- self.failUnlessEqual(ds[3], set(["f"]))
+ ds.add(3, "f")
ds2 = dictutil.DictOfSets()
ds2.add(3, "f")
ds2.add(3, "g")
ds2.add(4, "h")
ds.update(ds2)
- self.failUnlessEqual(ds[1], set(["a","e"]))
+ self.failUnlessEqual(ds[1], set(["a"]))
self.failUnlessEqual(ds[3], set(["f", "g"]))
self.failUnlessEqual(ds[4], set(["h"]))
if prevstart is not None:
yield (prevstart, prevend-prevstart+1)
- def __len__(self):
- # this also gets us bool(s)
+ def __nonzero__(self): # this gets us bool()
+ return self.len()
+
+ def len(self):
return len(self._have)
def __add__(self, other):
self.failUnlessEqual(list(s), [])
self.failIf(s)
self.failIf((0,1) in s)
- self.failUnlessEqual(len(s), 0)
+ self.failUnlessEqual(s.len(), 0)
s1 = Spans(3, 4) # 3,4,5,6
self._check1(s1)
+ s1 = Spans(3L, 4L) # 3,4,5,6
+ self._check1(s1)
+
s2 = Spans(s1)
self._check1(s2)
self.failUnless((10,1) in s2)
self.failIf((10,1) in s1)
self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11])
- self.failUnlessEqual(len(s2), 6)
+ self.failUnlessEqual(s2.len(), 6)
s2.add(15,2).add(20,2)
self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11,15,16,20,21])
- self.failUnlessEqual(len(s2), 10)
+ self.failUnlessEqual(s2.len(), 10)
s2.remove(4,3).remove(15,1)
self.failUnlessEqual(list(s2.each()), [3,10,11,16,20,21])
- self.failUnlessEqual(len(s2), 6)
+ self.failUnlessEqual(s2.len(), 6)
s1 = SimpleSpans(3, 4) # 3 4 5 6
s2 = SimpleSpans(5, 4) # 5 6 7 8
def _check1(self, s):
self.failUnlessEqual(list(s), [(3,4)])
self.failUnless(s)
- self.failUnlessEqual(len(s), 4)
+ self.failUnlessEqual(s.len(), 4)
self.failIf((0,1) in s)
self.failUnless((3,4) in s)
self.failUnless((3,1) in s)
self.failIf((7,1) in s)
self.failUnlessEqual(list(s.each()), [3,4,5,6])
+ def test_large(self):
+ s = Spans(4, 2**65) # don't do this with a SimpleSpans
+ self.failUnlessEqual(list(s), [(4, 2**65)])
+ self.failUnless(s)
+ self.failUnlessEqual(s.len(), 2**65)
+ self.failIf((0,1) in s)
+ self.failUnless((4,2) in s)
+ self.failUnless((2**65,2) in s)
+
def test_math(self):
s1 = Spans(0, 10) # 0,1,2,3,4,5,6,7,8,9
s2 = Spans(5, 3) # 5,6,7
def _create(subseed):
ns1 = S1(); ns2 = S2()
for i in range(10):
- what = md5(subseed+str(i)).hexdigest()
+ what = _hash(subseed+str(i)).hexdigest()
start = int(what[2:4], 16)
length = max(1,int(what[5:6], 16))
ns1.add(start, length); ns2.add(start, length)
#print
for i in range(1000):
- what = md5(seed+str(i)).hexdigest()
+ what = _hash(seed+str(i)).hexdigest()
op = what[0]
subop = what[1]
start = int(what[2:4], 16)
s1 = s1 & ns1; s2 = s2 & ns2
#print "s2 now %s" % s2.dump()
self.failUnlessEqual(list(s1.each()), list(s2.each()))
- self.failUnlessEqual(len(s1), len(s2))
+ self.failUnlessEqual(s1.len(), s2.len())
self.failUnlessEqual(bool(s1), bool(s2))
self.failUnlessEqual(list(s1), list(s2))
for j in range(10):
- what = md5(what[12:14]+str(j)).hexdigest()
+ what = _hash(what[12:14]+str(j)).hexdigest()
start = int(what[2:4], 16)
length = max(1, int(what[5:6], 16))
span = (start, length)
# list(s) -> list of (start,length) tuples, one per span
# (start,length) in s -> True if (start..start+length-1) are all members
# NOT equivalent to x in list(s)
- # len(s) -> number of bytes, for testing, bool(), and accounting/limiting
- # bool(s) (__len__)
+ # s.len() -> number of bytes, for testing, bool(), and accounting/limiting
+ # bool(s) (__nonzeron__)
# s = s1+s2, s1-s2, +=s1, -=s1
def test_overlap(self):
for (start, data) in other.get_chunks():
self.add(start, data)
- def __len__(self):
- return len(self.missing.translate(None, "1"))
+ def __nonzero__(self): # this gets us bool()
+ return self.len()
+ def len(self):
+ return len(self.missing.replace("1", ""))
def _dump(self):
return [i for (i,c) in enumerate(self.missing) if c == "0"]
def _have(self, start, length):
class StringSpans(unittest.TestCase):
def do_basic(self, klass):
ds = klass()
- self.failUnlessEqual(len(ds), 0)
+ self.failUnlessEqual(ds.len(), 0)
self.failUnlessEqual(list(ds._dump()), [])
self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 0)
- s = ds.get_spans()
+ s1 = ds.get_spans()
self.failUnlessEqual(ds.get(0, 4), None)
self.failUnlessEqual(ds.pop(0, 4), None)
ds.remove(0, 4)
ds.add(2, "four")
- self.failUnlessEqual(len(ds), 4)
+ self.failUnlessEqual(ds.len(), 4)
self.failUnlessEqual(list(ds._dump()), [2,3,4,5])
self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 4)
- s = ds.get_spans()
- self.failUnless((2,2) in s)
+ s1 = ds.get_spans()
+ self.failUnless((2,2) in s1)
self.failUnlessEqual(ds.get(0, 4), None)
self.failUnlessEqual(ds.pop(0, 4), None)
self.failUnlessEqual(ds.get(4, 4), None)
ds2 = klass(ds)
- self.failUnlessEqual(len(ds2), 4)
+ self.failUnlessEqual(ds2.len(), 4)
self.failUnlessEqual(list(ds2._dump()), [2,3,4,5])
self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_chunks()]), 4)
self.failUnlessEqual(ds2.get(0, 4), None)
self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 4)
ds.add(0, "23")
- self.failUnlessEqual(len(ds), 6)
+ self.failUnlessEqual(ds.len(), 6)
self.failUnlessEqual(list(ds._dump()), [0,1,2,3,4,5])
self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 6)
self.failUnlessEqual(ds.get(0, 4), "23fo")
ds.add(3, "ea")
self.failUnlessEqual(ds.get(2, 4), "fear")
+ ds = klass()
+ ds.add(2L, "four")
+ ds.add(3L, "ea")
+ self.failUnlessEqual(ds.get(2L, 4L), "fear")
+
+
def do_scan(self, klass):
# do a test with gaps and spans of size 1 and 2
# left=(1,11) * right=(1,11) * gapsize=(1,2)
return ds
def dump(s):
p = set(s._dump())
- # wow, this is the first time I've ever wanted ?: in python
- # note: this requires python2.5
- d = "".join([(S[i] if i in p else " ") for i in range(l)])
+ d = "".join([((i not in p) and " " or S[i]) for i in range(l)])
assert len(d) == l
return d
DEBUG = False
created = 0
pieces = []
while created < length:
- piece = md5(seed + str(created)).hexdigest()
+ piece = _hash(seed + str(created)).hexdigest()
pieces.append(piece)
created += len(piece)
return "".join(pieces)[:length]
def _create(subseed):
ns1 = S1(); ns2 = S2()
for i in range(10):
- what = md5(subseed+str(i)).hexdigest()
+ what = _hash(subseed+str(i)).hexdigest()
start = int(what[2:4], 16)
length = max(1,int(what[5:6], 16))
ns1.add(start, _randstr(length, what[7:9]));
#print
for i in range(1000):
- what = md5(seed+str(i)).hexdigest()
+ what = _hash(seed+str(i)).hexdigest()
op = what[0]
subop = what[1]
start = int(what[2:4], 16)
self.failUnlessEqual(d1, d2)
#print "s1 now %s" % list(s1._dump())
#print "s2 now %s" % list(s2._dump())
- self.failUnlessEqual(len(s1), len(s2))
+ self.failUnlessEqual(s1.len(), s2.len())
self.failUnlessEqual(list(s1._dump()), list(s2._dump()))
for j in range(100):
- what = md5(what[12:14]+str(j)).hexdigest()
+ what = _hash(what[12:14]+str(j)).hexdigest()
start = int(what[2:4], 16)
length = max(1, int(what[5:6], 16))
d1 = s1.get(start, length); d2 = s2.get(start, length)