from collections import namedtuple
from errno import ENOENT
+if sys.platform == "win32":
+ from ctypes import WINFUNCTYPE, WinError, windll, POINTER, byref, c_ulonglong, \
+ create_unicode_buffer, get_last_error
+ from ctypes.wintypes import BOOL, HANDLE, DWORD, LPCWSTR, LPWSTR, LPVOID
+
from twisted.python import log
from pycryptopp.cipher.aes import AES
have_GetDiskFreeSpaceExW = False
if sys.platform == "win32":
- from ctypes import WINFUNCTYPE, windll, POINTER, byref, c_ulonglong, create_unicode_buffer, \
- get_last_error
- from ctypes.wintypes import BOOL, DWORD, LPCWSTR, LPWSTR
-
# <http://msdn.microsoft.com/en-us/library/windows/desktop/ms683188%28v=vs.85%29.aspx>
GetEnvironmentVariableW = WINFUNCTYPE(
DWORD, LPCWSTR, LPWSTR, DWORD,
return 0
+class ConflictError(Exception):
+ pass
+
+class UnableToUnlinkReplacementError(Exception):
+ pass
+
+def reraise(wrapper):
+ _, exc, tb = sys.exc_info()
+ wrapper_exc = wrapper("%s: %s" % (exc.__class__.__name__, exc))
+ raise wrapper_exc.__class__, wrapper_exc, tb
+
+if sys.platform == "win32":
+ # <https://msdn.microsoft.com/en-us/library/windows/desktop/aa365512%28v=vs.85%29.aspx>
+ ReplaceFileW = WINFUNCTYPE(
+ BOOL, LPCWSTR, LPCWSTR, LPCWSTR, DWORD, LPVOID, LPVOID,
+ use_last_error=True
+ )(("ReplaceFileW", windll.kernel32))
+
+ REPLACEFILE_IGNORE_MERGE_ERRORS = 0x00000002
+
+ # <https://msdn.microsoft.com/en-us/library/windows/desktop/ms681382%28v=vs.85%29.aspx>
+ ERROR_FILE_NOT_FOUND = 2
+
+ def rename_no_overwrite(source_path, dest_path):
+ os.rename(source_path, dest_path)
+
+ def replace_file(replaced_path, replacement_path, backup_path):
+ precondition_abspath(replaced_path)
+ precondition_abspath(replacement_path)
+ precondition_abspath(backup_path)
+
+ r = ReplaceFileW(replaced_path, replacement_path, backup_path,
+ REPLACEFILE_IGNORE_MERGE_ERRORS, None, None)
+ if r == 0:
+ # The UnableToUnlinkReplacementError case does not happen on Windows;
+ # all errors should be treated as signalling a conflict.
+ err = get_last_error()
+ if err != ERROR_FILE_NOT_FOUND:
+ raise ConflictError("WinError: %s" % (WinError(err),))
+
+ try:
+ rename_no_overwrite(replacement_path, replaced_path)
+ except EnvironmentError:
+ reraise(ConflictError)
+else:
+ def rename_no_overwrite(source_path, dest_path):
+ # link will fail with EEXIST if there is already something at dest_path.
+ os.link(source_path, dest_path)
+ try:
+ os.unlink(source_path)
+ except EnvironmentError:
+ reraise(UnableToUnlinkReplacementError)
+
+ def replace_file(replaced_path, replacement_path, backup_path):
+ precondition_abspath(replaced_path)
+ precondition_abspath(replacement_path)
+ precondition_abspath(backup_path)
+
+ if not os.path.exists(replacement_path):
+ raise ConflictError("Replacement file not found: %r" % (replacement_path,))
+
+ try:
+ os.rename(replaced_path, backup_path)
+ except OSError as e:
+ if e.errno != ENOENT:
+ raise
+ try:
+ rename_no_overwrite(replacement_path, replaced_path)
+ except EnvironmentError:
+ reraise(ConflictError)
+
PathInfo = namedtuple('PathInfo', 'isdir isfile islink exists size mtime ctime')
def get_pathinfo(path_u, now=None):