fileutil.write_atomically(fn, "two", mode="") # non-binary
self.failUnlessEqual(fileutil.read(fn), "two")
- def test_NamedTemporaryDirectory(self):
- basedir = "util/FileUtil/test_NamedTemporaryDirectory"
- fileutil.make_dirs(basedir)
- td = fileutil.NamedTemporaryDirectory(dir=basedir)
- name = td.name
- self.failUnless(basedir in name)
- self.failUnless(basedir in repr(td))
- self.failUnless(os.path.isdir(name))
- del td
- # it is conceivable that we need to force gc here, but I'm not sure
- self.failIf(os.path.isdir(name))
-
def test_rename(self):
basedir = "util/FileUtil/test_rename"
fileutil.make_dirs(basedir)
self.failIf(os.path.exists(fn))
self.failUnless(os.path.exists(fn2))
+ def test_rename_no_overwrite(self):
+ workdir = fileutil.abspath_expanduser_unicode(u"test_rename_no_overwrite")
+ fileutil.make_dirs(workdir)
+
+ source_path = os.path.join(workdir, "source")
+ dest_path = os.path.join(workdir, "dest")
+
+ # when neither file exists
+ self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
+
+ # when only dest exists
+ fileutil.write(dest_path, "dest")
+ self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
+ self.failUnlessEqual(fileutil.read(dest_path), "dest")
+
+ # when both exist
+ fileutil.write(source_path, "source")
+ self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
+ self.failUnlessEqual(fileutil.read(source_path), "source")
+ self.failUnlessEqual(fileutil.read(dest_path), "dest")
+
+ # when only source exists
+ os.remove(dest_path)
+ fileutil.rename_no_overwrite(source_path, dest_path)
+ self.failUnlessEqual(fileutil.read(dest_path), "source")
+ self.failIf(os.path.exists(source_path))
+
+ def test_replace_file(self):
+ workdir = fileutil.abspath_expanduser_unicode(u"test_replace_file")
+ fileutil.make_dirs(workdir)
+
+ backup_path = os.path.join(workdir, "backup")
+ replaced_path = os.path.join(workdir, "replaced")
+ replacement_path = os.path.join(workdir, "replacement")
+
+ # when none of the files exist
+ self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path, backup_path)
+
+ # when only replaced exists
+ fileutil.write(replaced_path, "foo")
+ self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path, backup_path)
+ self.failUnlessEqual(fileutil.read(replaced_path), "foo")
+
+ # when both replaced and replacement exist, but not backup
+ fileutil.write(replacement_path, "bar")
+ fileutil.replace_file(replaced_path, replacement_path, backup_path)
+ self.failUnlessEqual(fileutil.read(backup_path), "foo")
+ self.failUnlessEqual(fileutil.read(replaced_path), "bar")
+ self.failIf(os.path.exists(replacement_path))
+
+ # when only replacement exists
+ os.remove(backup_path)
+ os.remove(replaced_path)
+ fileutil.write(replacement_path, "bar")
+ fileutil.replace_file(replaced_path, replacement_path, backup_path)
+ self.failUnlessEqual(fileutil.read(replaced_path), "bar")
+ self.failIf(os.path.exists(replacement_path))
+ self.failIf(os.path.exists(backup_path))
+
+ # when replaced, replacement and backup all exist
+ fileutil.write(replaced_path, "foo")
+ fileutil.write(replacement_path, "bar")
+ fileutil.write(backup_path, "bak")
+ fileutil.replace_file(replaced_path, replacement_path, backup_path)
+ self.failUnlessEqual(fileutil.read(backup_path), "foo")
+ self.failUnlessEqual(fileutil.read(replaced_path), "bar")
+ self.failIf(os.path.exists(replacement_path))
+
def test_du(self):
basedir = "util/FileUtil/test_du"
fileutil.make_dirs(basedir)
saved_cwd = os.path.normpath(os.getcwdu())
abspath_cwd = fileutil.abspath_expanduser_unicode(u".")
+ abspath_cwd_notlong = fileutil.abspath_expanduser_unicode(u".", long_path=False)
self.failUnless(isinstance(saved_cwd, unicode), saved_cwd)
self.failUnless(isinstance(abspath_cwd, unicode), abspath_cwd)
if sys.platform == "win32":
self.failUnlessReallyEqual(abspath_cwd, fileutil.to_windows_long_path(saved_cwd))
else:
self.failUnlessReallyEqual(abspath_cwd, saved_cwd)
+ self.failUnlessReallyEqual(abspath_cwd_notlong, saved_cwd)
self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\?\\foo"), u"\\\\?\\foo")
self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\.\\foo"), u"\\\\.\\foo")
# adapted from <http://svn.python.org/view/python/branches/release26-maint/Lib/test/test_posixpath.py?view=markup&pathrev=78279#test_abspath>
- self.failUnlessIn(u"foo", fileutil.abspath_expanduser_unicode(u"foo"))
+ foo = fileutil.abspath_expanduser_unicode(u"foo")
+ self.failUnless(foo.endswith(u"%sfoo" % (os.path.sep,)), foo)
+
+ foobar = fileutil.abspath_expanduser_unicode(u"bar", base=foo)
+ self.failUnless(foobar.endswith(u"%sfoo%sbar" % (os.path.sep, os.path.sep)), foobar)
+
+ if sys.platform == "win32":
+ # This is checking that a drive letter is added for a path without one.
+ baz = fileutil.abspath_expanduser_unicode(u"\\baz")
+ self.failUnless(baz.startswith(u"\\\\?\\"), baz)
+ self.failUnlessReallyEqual(baz[5 :], u":\\baz")
+
+ bar = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz)
+ self.failUnless(bar.startswith(u"\\\\?\\"), bar)
+ self.failUnlessReallyEqual(bar[5 :], u":\\bar")
+ # not u":\\baz\\bar", because \bar is absolute on the current drive.
+
+ self.failUnlessReallyEqual(baz[4], bar[4]) # same drive
+
+ baz_notlong = fileutil.abspath_expanduser_unicode(u"\\baz", long_path=False)
+ self.failIf(baz_notlong.startswith(u"\\\\?\\"), baz_notlong)
+ self.failUnlessReallyEqual(baz_notlong[1 :], u":\\baz")
+
+ bar_notlong = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz_notlong, long_path=False)
+ self.failIf(bar_notlong.startswith(u"\\\\?\\"), bar_notlong)
+ self.failUnlessReallyEqual(bar_notlong[1 :], u":\\bar")
+ # not u":\\baz\\bar", because \bar is absolute on the current drive.
+
+ self.failUnlessReallyEqual(baz_notlong[0], bar_notlong[0]) # same drive
+
self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~"))
+ self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~", long_path=False))
cwds = ['cwd']
try:
for upath in (u'', u'fuu', u'f\xf9\xf9', u'/fuu', u'U:\\', u'~'):
uabspath = fileutil.abspath_expanduser_unicode(upath)
self.failUnless(isinstance(uabspath, unicode), uabspath)
+
+ uabspath_notlong = fileutil.abspath_expanduser_unicode(upath, long_path=False)
+ self.failUnless(isinstance(uabspath_notlong, unicode), uabspath_notlong)
finally:
os.chdir(saved_cwd)
_cleanup()
self.failIf(os.path.exists(long_path))
- def test_windows_expanduser(self):
+ def _test_windows_expanduser(self, userprofile=None, homedrive=None, homepath=None):
def call_windows_getenv(name):
- if name == u"HOMEDRIVE": return u"C:"
- if name == u"HOMEPATH": return u"\\Documents and Settings\\\u0100"
+ if name == u"USERPROFILE": return userprofile
+ if name == u"HOMEDRIVE": return homedrive
+ if name == u"HOMEPATH": return homepath
self.fail("unexpected argument to call_windows_getenv")
self.patch(fileutil, 'windows_getenv', call_windows_getenv)
self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a~"), u"a~")
self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a\\~\\foo"), u"a\\~\\foo")
+ def test_windows_expanduser_xp(self):
+ return self._test_windows_expanduser(homedrive=u"C:", homepath=u"\\Documents and Settings\\\u0100")
+
+ def test_windows_expanduser_win7(self):
+ return self._test_windows_expanduser(userprofile=os.path.join(u"C:", u"\\Documents and Settings\\\u0100"))
+
def test_disk_stats(self):
avail = fileutil.get_available_space('.', 2**14)
if avail == 0:
disk = fileutil.get_disk_stats('.', 2**128)
self.failUnlessEqual(disk['avail'], 0)
+ def test_get_pathinfo(self):
+ basedir = "util/FileUtil/test_get_pathinfo"
+ fileutil.make_dirs(basedir)
+
+ # create a directory
+ self.mkdir(basedir, "a")
+ dirinfo = fileutil.get_pathinfo(basedir)
+ self.failUnlessTrue(dirinfo.isdir)
+ self.failUnlessTrue(dirinfo.exists)
+ self.failUnlessFalse(dirinfo.isfile)
+ self.failUnlessFalse(dirinfo.islink)
+
+ # create a file
+ f = os.path.join(basedir, "1.txt")
+ fileutil.write(f, "a"*10)
+ fileinfo = fileutil.get_pathinfo(f)
+ self.failUnlessTrue(fileinfo.isfile)
+ self.failUnlessTrue(fileinfo.exists)
+ self.failUnlessFalse(fileinfo.isdir)
+ self.failUnlessFalse(fileinfo.islink)
+ self.failUnlessEqual(fileinfo.size, 10)
+
+ # path at which nothing exists
+ dnename = os.path.join(basedir, "doesnotexist")
+ now = time.time()
+ dneinfo = fileutil.get_pathinfo(dnename, now=now)
+ self.failUnlessFalse(dneinfo.exists)
+ self.failUnlessFalse(dneinfo.isfile)
+ self.failUnlessFalse(dneinfo.isdir)
+ self.failUnlessFalse(dneinfo.islink)
+ self.failUnlessEqual(dneinfo.size, None)
+ self.failUnlessEqual(dneinfo.mtime, now)
+ self.failUnlessEqual(dneinfo.ctime, now)
+
+ def test_get_pathinfo_symlink(self):
+ if not hasattr(os, 'symlink'):
+ raise unittest.SkipTest("can't create symlinks on this platform")
+
+ basedir = "util/FileUtil/test_get_pathinfo"
+ fileutil.make_dirs(basedir)
+
+ f = os.path.join(basedir, "1.txt")
+ fileutil.write(f, "a"*10)
+
+ # create a symlink pointing to 1.txt
+ slname = os.path.join(basedir, "linkto1.txt")
+ os.symlink(f, slname)
+ symlinkinfo = fileutil.get_pathinfo(slname)
+ self.failUnlessTrue(symlinkinfo.islink)
+ self.failUnlessTrue(symlinkinfo.exists)
+ self.failUnlessFalse(symlinkinfo.isfile)
+ self.failUnlessFalse(symlinkinfo.isdir)
+
+
class PollMixinTests(unittest.TestCase):
def setUp(self):
self.pm = pollmixin.PollMixin()
d.addCallbacks(_suc, _err)
return d
-class DeferredUtilTests(unittest.TestCase):
+class DeferredUtilTests(unittest.TestCase, deferredutil.WaitForDelayedCallsMixin):
def test_gather_results(self):
d1 = defer.Deferred()
d2 = defer.Deferred()
self.failUnless(isinstance(f, Failure))
self.failUnless(f.check(ValueError))
+ def test_wait_for_delayed_calls(self):
+ """
+ This tests that 'wait_for_delayed_calls' does in fact wait for a
+ delayed call that is active when the test returns. If it didn't,
+ Trial would report an unclean reactor error for this test.
+ """
+ def _trigger():
+ #print "trigger"
+ pass
+ reactor.callLater(0.1, _trigger)
+
+ d = defer.succeed(None)
+ d.addBoth(self.wait_for_delayed_calls)
+ return d
+
class HashUtilTests(unittest.TestCase):
def test_random_key(self):
def test_parse_date(self):
self.failUnlessEqual(time_format.parse_date("2010-02-21"), 1266710400)
+ def test_format_time(self):
+ self.failUnlessEqual(time_format.format_time(time.gmtime(0)), '1970-01-01 00:00:00')
+ self.failUnlessEqual(time_format.format_time(time.gmtime(60)), '1970-01-01 00:01:00')
+ self.failUnlessEqual(time_format.format_time(time.gmtime(60*60)), '1970-01-01 01:00:00')
+ seconds_per_day = 60*60*24
+ leap_years_1970_to_2014_inclusive = ((2012 - 1968) // 4)
+ self.failUnlessEqual(time_format.format_time(time.gmtime(seconds_per_day*((2015 - 1970)*365+leap_years_1970_to_2014_inclusive))), '2015-01-01 00:00:00')
+
+ def test_format_time_y2038(self):
+ seconds_per_day = 60*60*24
+ leap_years_1970_to_2047_inclusive = ((2044 - 1968) // 4)
+ self.failUnlessEqual(time_format.format_time(time.gmtime(seconds_per_day*((2048 - 1970)*365+leap_years_1970_to_2047_inclusive))), '2048-01-01 00:00:00')
+
+ test_format_time_y2038.todo = "This test is known to fail on systems with 32-bit time_t."
+
+ def test_format_delta(self):
+ time_1 = 1389812723
+ time_5s_delta = 1389812728
+ time_28m7s_delta = 1389814410
+ time_1h_delta = 1389816323
+ time_1d21h46m49s_delta = 1389977532
+
+ self.failUnlessEqual(
+ time_format.format_delta(time_1, time_1), '0s')
+
+ self.failUnlessEqual(
+ time_format.format_delta(time_1, time_5s_delta), '5s')
+ self.failUnlessEqual(
+ time_format.format_delta(time_1, time_28m7s_delta), '28m 7s')
+ self.failUnlessEqual(
+ time_format.format_delta(time_1, time_1h_delta), '1h 0m 0s')
+ self.failUnlessEqual(
+ time_format.format_delta(time_1, time_1d21h46m49s_delta), '1d 21h 46m 49s')
+
+ self.failUnlessEqual(
+ time_format.format_delta(time_1d21h46m49s_delta, time_1), '-')
+
+ # time_1 with a decimal fraction will make the delta 1s less
+ time_1decimal = 1389812723.383963
+
+ self.failUnlessEqual(
+ time_format.format_delta(time_1decimal, time_5s_delta), '4s')
+ self.failUnlessEqual(
+ time_format.format_delta(time_1decimal, time_28m7s_delta), '28m 6s')
+ self.failUnlessEqual(
+ time_format.format_delta(time_1decimal, time_1h_delta), '59m 59s')
+ self.failUnlessEqual(
+ time_format.format_delta(time_1decimal, time_1d21h46m49s_delta), '1d 21h 46m 48s')
+
class CacheDir(unittest.TestCase):
def test_basic(self):
basedir = "test_util/CacheDir/test_basic"