self.failIf(os.path.exists(fn))
self.failUnless(os.path.exists(fn2))
+ def test_rename_no_overwrite(self):
+ workdir = fileutil.abspath_expanduser_unicode(u"test_rename_no_overwrite")
+ fileutil.make_dirs(workdir)
+
+ source_path = os.path.join(workdir, "source")
+ dest_path = os.path.join(workdir, "dest")
+
+ # when neither file exists
+ self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
+
+ # when only dest exists
+ fileutil.write(dest_path, "dest")
+ self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
+ self.failUnlessEqual(fileutil.read(dest_path), "dest")
+
+ # when both exist
+ fileutil.write(source_path, "source")
+ self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
+ self.failUnlessEqual(fileutil.read(source_path), "source")
+ self.failUnlessEqual(fileutil.read(dest_path), "dest")
+
+ # when only source exists
+ os.remove(dest_path)
+ fileutil.rename_no_overwrite(source_path, dest_path)
+ self.failUnlessEqual(fileutil.read(dest_path), "source")
+ self.failIf(os.path.exists(source_path))
+
+ def test_replace_file(self):
+ workdir = fileutil.abspath_expanduser_unicode(u"test_replace_file")
+ fileutil.make_dirs(workdir)
+
+ backup_path = os.path.join(workdir, "backup")
+ replaced_path = os.path.join(workdir, "replaced")
+ replacement_path = os.path.join(workdir, "replacement")
+
+ # when none of the files exist
+ self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path, backup_path)
+
+ # when only replaced exists
+ fileutil.write(replaced_path, "foo")
+ self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path, backup_path)
+ self.failUnlessEqual(fileutil.read(replaced_path), "foo")
+
+ # when both replaced and replacement exist, but not backup
+ fileutil.write(replacement_path, "bar")
+ fileutil.replace_file(replaced_path, replacement_path, backup_path)
+ self.failUnlessEqual(fileutil.read(backup_path), "foo")
+ self.failUnlessEqual(fileutil.read(replaced_path), "bar")
+ self.failIf(os.path.exists(replacement_path))
+
+ # when only replacement exists
+ os.remove(backup_path)
+ os.remove(replaced_path)
+ fileutil.write(replacement_path, "bar")
+ fileutil.replace_file(replaced_path, replacement_path, backup_path)
+ self.failUnlessEqual(fileutil.read(replaced_path), "bar")
+ self.failIf(os.path.exists(replacement_path))
+ self.failIf(os.path.exists(backup_path))
+
+ # when replaced, replacement and backup all exist
+ fileutil.write(replaced_path, "foo")
+ fileutil.write(replacement_path, "bar")
+ fileutil.write(backup_path, "bak")
+ fileutil.replace_file(replaced_path, replacement_path, backup_path)
+ self.failUnlessEqual(fileutil.read(backup_path), "foo")
+ self.failUnlessEqual(fileutil.read(replaced_path), "bar")
+ self.failIf(os.path.exists(replacement_path))
+
def test_du(self):
basedir = "util/FileUtil/test_du"
fileutil.make_dirs(basedir)
disk = fileutil.get_disk_stats('.', 2**128)
self.failUnlessEqual(disk['avail'], 0)
+ def test_get_pathinfo(self):
+ basedir = "util/FileUtil/test_get_pathinfo"
+ fileutil.make_dirs(basedir)
+
+ # create a directory
+ self.mkdir(basedir, "a")
+ dirinfo = fileutil.get_pathinfo(basedir)
+ self.failUnlessTrue(dirinfo.isdir)
+ self.failUnlessTrue(dirinfo.exists)
+ self.failUnlessFalse(dirinfo.isfile)
+ self.failUnlessFalse(dirinfo.islink)
+
+ # create a file
+ f = os.path.join(basedir, "1.txt")
+ fileutil.write(f, "a"*10)
+ fileinfo = fileutil.get_pathinfo(f)
+ self.failUnlessTrue(fileinfo.isfile)
+ self.failUnlessTrue(fileinfo.exists)
+ self.failUnlessFalse(fileinfo.isdir)
+ self.failUnlessFalse(fileinfo.islink)
+ self.failUnlessEqual(fileinfo.size, 10)
+
+ # path at which nothing exists
+ dnename = os.path.join(basedir, "doesnotexist")
+ now = time.time()
+ dneinfo = fileutil.get_pathinfo(dnename, now=now)
+ self.failUnlessFalse(dneinfo.exists)
+ self.failUnlessFalse(dneinfo.isfile)
+ self.failUnlessFalse(dneinfo.isdir)
+ self.failUnlessFalse(dneinfo.islink)
+ self.failUnlessEqual(dneinfo.size, None)
+ self.failUnlessEqual(dneinfo.mtime, now)
+ self.failUnlessEqual(dneinfo.ctime, now)
+
+ def test_get_pathinfo_symlink(self):
+ if not hasattr(os, 'symlink'):
+ raise unittest.SkipTest("can't create symlinks on this platform")
+
+ basedir = "util/FileUtil/test_get_pathinfo"
+ fileutil.make_dirs(basedir)
+
+ f = os.path.join(basedir, "1.txt")
+ fileutil.write(f, "a"*10)
+
+ # create a symlink pointing to 1.txt
+ slname = os.path.join(basedir, "linkto1.txt")
+ os.symlink(f, slname)
+ symlinkinfo = fileutil.get_pathinfo(slname)
+ self.failUnlessTrue(symlinkinfo.islink)
+ self.failUnlessTrue(symlinkinfo.exists)
+ self.failUnlessFalse(symlinkinfo.isfile)
+ self.failUnlessFalse(symlinkinfo.isdir)
+
+
class PollMixinTests(unittest.TestCase):
def setUp(self):
self.pm = pollmixin.PollMixin()
"""
import sys, exceptions, os, stat, tempfile, time, binascii
+from collections import namedtuple
+from errno import ENOENT
if sys.platform == "win32":
from ctypes import WINFUNCTYPE, WinError, windll, POINTER, byref, c_ulonglong, \
create_unicode_buffer, get_last_error
- from ctypes.wintypes import BOOL, DWORD, LPCWSTR, LPWSTR
+ from ctypes.wintypes import BOOL, DWORD, LPCWSTR, LPWSTR, LPVOID
from twisted.python import log
except EnvironmentError:
log.msg("OS call to get disk statistics failed")
return 0
+
+
+class ConflictError(Exception):
+ pass
+
+class UnableToUnlinkReplacementError(Exception):
+ pass
+
+def reraise(wrapper):
+ _, exc, tb = sys.exc_info()
+ wrapper_exc = wrapper("%s: %s" % (exc.__class__.__name__, exc))
+ raise wrapper_exc.__class__, wrapper_exc, tb
+
+if sys.platform == "win32":
+ # <https://msdn.microsoft.com/en-us/library/windows/desktop/aa365512%28v=vs.85%29.aspx>
+ ReplaceFileW = WINFUNCTYPE(
+ BOOL, LPCWSTR, LPCWSTR, LPCWSTR, DWORD, LPVOID, LPVOID,
+ use_last_error=True
+ )(("ReplaceFileW", windll.kernel32))
+
+ REPLACEFILE_IGNORE_MERGE_ERRORS = 0x00000002
+
+ # <https://msdn.microsoft.com/en-us/library/windows/desktop/ms681382%28v=vs.85%29.aspx>
+ ERROR_FILE_NOT_FOUND = 2
+
+ def rename_no_overwrite(source_path, dest_path):
+ os.rename(source_path, dest_path)
+
+ def replace_file(replaced_path, replacement_path, backup_path):
+ precondition_abspath(replaced_path)
+ precondition_abspath(replacement_path)
+ precondition_abspath(backup_path)
+
+ r = ReplaceFileW(replaced_path, replacement_path, backup_path,
+ REPLACEFILE_IGNORE_MERGE_ERRORS, None, None)
+ if r == 0:
+ # The UnableToUnlinkReplacementError case does not happen on Windows;
+ # all errors should be treated as signalling a conflict.
+ err = get_last_error()
+ if err != ERROR_FILE_NOT_FOUND:
+ raise ConflictError("WinError: %s" % (WinError(err),))
+
+ try:
+ rename_no_overwrite(replacement_path, replaced_path)
+ except EnvironmentError:
+ reraise(ConflictError)
+else:
+ def rename_no_overwrite(source_path, dest_path):
+ # link will fail with EEXIST if there is already something at dest_path.
+ os.link(source_path, dest_path)
+ try:
+ os.unlink(source_path)
+ except EnvironmentError:
+ reraise(UnableToUnlinkReplacementError)
+
+ def replace_file(replaced_path, replacement_path, backup_path):
+ precondition_abspath(replaced_path)
+ precondition_abspath(replacement_path)
+ precondition_abspath(backup_path)
+
+ if not os.path.exists(replacement_path):
+ raise ConflictError("Replacement file not found: %r" % (replacement_path,))
+
+ try:
+ os.rename(replaced_path, backup_path)
+ except OSError as e:
+ if e.errno != ENOENT:
+ raise
+ try:
+ rename_no_overwrite(replacement_path, replaced_path)
+ except EnvironmentError:
+ reraise(ConflictError)
+
+PathInfo = namedtuple('PathInfo', 'isdir isfile islink exists size mtime ctime')
+
+def get_pathinfo(path_u, now=None):
+ try:
+ statinfo = os.lstat(path_u)
+ mode = statinfo.st_mode
+ return PathInfo(isdir =stat.S_ISDIR(mode),
+ isfile=stat.S_ISREG(mode),
+ islink=stat.S_ISLNK(mode),
+ exists=True,
+ size =statinfo.st_size,
+ mtime =statinfo.st_mtime,
+ ctime =statinfo.st_ctime,
+ )
+ except OSError as e:
+ if e.errno == ENOENT:
+ if now is None:
+ now = time.time()
+ return PathInfo(isdir =False,
+ isfile=False,
+ islink=False,
+ exists=False,
+ size =None,
+ mtime =now,
+ ctime =now,
+ )
+ raise