from allmydata.util import statistics, dictutil, pipeline
from allmydata.util import log as tahoe_log
from allmydata.util.spans import Spans, overlap, DataSpans
+from allmydata.test.common_util import ReallyEqualMixin
+
class Base32(unittest.TestCase):
def test_b2a_matches_Pythons(self):
m = self.should_assert(f, False, othermsg="message2")
self.failUnlessEqual("postcondition: othermsg: 'message2' <type 'str'>", m)
-class FileUtil(unittest.TestCase):
+class FileUtil(ReallyEqualMixin, unittest.TestCase):
def mkdir(self, basedir, path, mode=0777):
fn = os.path.join(basedir, path)
fileutil.make_dirs(fn, mode)
fileutil.write_atomically(fn, "two", mode="") # non-binary
self.failUnlessEqual(fileutil.read(fn), "two")
- def test_open_or_create(self):
- basedir = "util/FileUtil/test_open_or_create"
- fileutil.make_dirs(basedir)
- fn = os.path.join(basedir, "here")
- f = fileutil.open_or_create(fn)
- f.write("stuff.")
- f.close()
- f = fileutil.open_or_create(fn)
- f.seek(0, os.SEEK_END)
- f.write("more.")
- f.close()
- f = open(fn, "r")
- data = f.read()
- f.close()
- self.failUnlessEqual(data, "stuff.more.")
-
def test_NamedTemporaryDirectory(self):
basedir = "util/FileUtil/test_NamedTemporaryDirectory"
fileutil.make_dirs(basedir)
abspath_cwd = fileutil.abspath_expanduser_unicode(u".")
self.failUnless(isinstance(saved_cwd, unicode), saved_cwd)
self.failUnless(isinstance(abspath_cwd, unicode), abspath_cwd)
- self.failUnlessEqual(abspath_cwd, saved_cwd)
+ if sys.platform == "win32":
+ self.failUnlessReallyEqual(abspath_cwd, fileutil.to_windows_long_path(saved_cwd))
+ else:
+ self.failUnlessReallyEqual(abspath_cwd, saved_cwd)
+
+ self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\?\\foo"), u"\\\\?\\foo")
+ self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\.\\foo"), u"\\\\.\\foo")
+ self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\server\\foo"), u"\\\\?\\UNC\\server\\foo")
+ self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo"), u"\\\\?\\C:\\foo")
+ self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo/bar"), u"\\\\?\\C:\\foo\\bar")
# adapted from <http://svn.python.org/view/python/branches/release26-maint/Lib/test/test_posixpath.py?view=markup&pathrev=78279#test_abspath>
- self.failUnlessIn(u"foo", fileutil.abspath_expanduser_unicode(u"foo"))
+ foo = fileutil.abspath_expanduser_unicode(u"foo")
+ self.failUnless(foo.endswith(u"%sfoo" % (os.path.sep,)), foo)
+
+ foobar = fileutil.abspath_expanduser_unicode(u"bar", base=foo)
+ self.failUnless(foobar.endswith(u"%sfoo%sbar" % (os.path.sep, os.path.sep)), foobar)
+
+ if sys.platform == "win32":
+ # This is checking that a drive letter is added for a path without one.
+ baz = fileutil.abspath_expanduser_unicode(u"\\baz")
+ self.failUnless(baz.startswith(u"\\\\?\\"), baz)
+ self.failUnlessReallyEqual(baz[5 :], u":\\baz")
+
+ bar = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz)
+ self.failUnless(bar.startswith(u"\\\\?\\"), bar)
+ self.failUnlessReallyEqual(bar[5 :], u":\\bar")
+ # not u":\\baz\\bar", because \bar is absolute on the current drive.
+
+ self.failUnlessReallyEqual(baz[4], bar[4]) # same drive
+
self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~"))
cwds = ['cwd']
finally:
os.chdir(saved_cwd)
+ def test_create_long_path(self):
+ workdir = u"test_create_long_path"
+ fileutil.make_dirs(workdir)
+ long_path = fileutil.abspath_expanduser_unicode(os.path.join(workdir, u'x'*255))
+ def _cleanup():
+ fileutil.remove(long_path)
+ self.addCleanup(_cleanup)
+
+ fileutil.write(long_path, "test")
+ self.failUnless(os.path.exists(long_path))
+ self.failUnlessEqual(fileutil.read(long_path), "test")
+ _cleanup()
+ self.failIf(os.path.exists(long_path))
+
+ def _test_windows_expanduser(self, userprofile=None, homedrive=None, homepath=None):
+ def call_windows_getenv(name):
+ if name == u"USERPROFILE": return userprofile
+ if name == u"HOMEDRIVE": return homedrive
+ if name == u"HOMEPATH": return homepath
+ self.fail("unexpected argument to call_windows_getenv")
+ self.patch(fileutil, 'windows_getenv', call_windows_getenv)
+
+ self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100"))
+ self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~\\foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo"))
+ self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~/foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo"))
+ self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a"), u"a")
+ self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a~"), u"a~")
+ self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a\\~\\foo"), u"a\\~\\foo")
+
+ def test_windows_expanduser_xp(self):
+ return self._test_windows_expanduser(homedrive=u"C:", homepath=u"\\Documents and Settings\\\u0100")
+
+ def test_windows_expanduser_win7(self):
+ return self._test_windows_expanduser(userprofile=os.path.join(u"C:", u"\\Documents and Settings\\\u0100"))
+
def test_disk_stats(self):
avail = fileutil.get_available_space('.', 2**14)
if avail == 0:
disk = fileutil.get_disk_stats('.', 2**13)
self.failUnless(disk['total'] > 0, disk['total'])
- self.failUnless(disk['used'] > 0, disk['used'])
+ # we tolerate used==0 for a Travis-CI bug, see #2290
+ self.failUnless(disk['used'] >= 0, disk['used'])
self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
self.failUnless(disk['avail'] > 0, disk['avail'])
d.addCallbacks(_suc, _err)
return d
-class DeferredUtilTests(unittest.TestCase):
+class DeferredUtilTests(unittest.TestCase, deferredutil.WaitForDelayedCallsMixin):
def test_gather_results(self):
d1 = defer.Deferred()
d2 = defer.Deferred()
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(ValueError))
+ def test_wait_for_delayed_calls(self):
+ """
+ This tests that 'wait_for_delayed_calls' does in fact wait for a
+ delayed call that is active when the test returns. If it didn't,
+ Trial would report an unclean reactor error for this test.
+ """
+ def _trigger():
+ #print "trigger"
+ pass
+ reactor.callLater(0.1, _trigger)
+
+ d = defer.succeed(None)
+ d.addBoth(self.wait_for_delayed_calls)
+ return d
+
class HashUtilTests(unittest.TestCase):
def test_random_key(self):
h2.update("foo")
self.failUnlessEqual(h1, h2.digest())
- def test_constant_time_compare(self):
- self.failUnless(hashutil.constant_time_compare("a", "a"))
- self.failUnless(hashutil.constant_time_compare("ab", "ab"))
- self.failIf(hashutil.constant_time_compare("a", "b"))
- self.failIf(hashutil.constant_time_compare("a", "aa"))
+ def test_timing_safe_compare(self):
+ self.failUnless(hashutil.timing_safe_compare("a", "a"))
+ self.failUnless(hashutil.timing_safe_compare("ab", "ab"))
+ self.failIf(hashutil.timing_safe_compare("a", "b"))
+ self.failIf(hashutil.timing_safe_compare("a", "aa"))
def _testknown(self, hashf, expected_a, *args):
got = hashf(*args)
self.failUnlessEqual(ds.len(), 0)
self.failUnlessEqual(list(ds._dump()), [])
self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 0)
- s = ds.get_spans()
+ s1 = ds.get_spans()
self.failUnlessEqual(ds.get(0, 4), None)
self.failUnlessEqual(ds.pop(0, 4), None)
ds.remove(0, 4)
self.failUnlessEqual(ds.len(), 4)
self.failUnlessEqual(list(ds._dump()), [2,3,4,5])
self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 4)
- s = ds.get_spans()
- self.failUnless((2,2) in s)
+ s1 = ds.get_spans()
+ self.failUnless((2,2) in s1)
self.failUnlessEqual(ds.get(0, 4), None)
self.failUnlessEqual(ds.pop(0, 4), None)
self.failUnlessEqual(ds.get(4, 4), None)