From 328f4b48ffce2a8c2f34a447dc401cd148cd6b41 Mon Sep 17 00:00:00 2001 From: Daira Hopwood Date: Mon, 18 May 2015 02:35:27 +0100 Subject: [PATCH] WIP. Signed-off-by: Daira Hopwood --- src/allmydata/util/fileutil.py | 27 ++- src/allmydata/windows/inotify.py | 328 ++++++++++++++++++------------- 2 files changed, 211 insertions(+), 144 deletions(-) diff --git a/src/allmydata/util/fileutil.py b/src/allmydata/util/fileutil.py index 54e34484..f0d8812d 100644 --- a/src/allmydata/util/fileutil.py +++ b/src/allmydata/util/fileutil.py @@ -522,8 +522,11 @@ if sys.platform == "win32": from ctypes.wintypes import BOOL, HANDLE, DWORD, LPCWSTR, LPVOID # - CreateFileW = WINFUNCTYPE(HANDLE, LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE) \ - (("CreateFileW", windll.kernel32)) + CreateFileW = WINFUNCTYPE( + HANDLE, + LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE, + use_last_error=True + )(("CreateFileW", windll.kernel32)) GENERIC_WRITE = 0x40000000 FILE_SHARE_READ = 0x00000001 @@ -532,14 +535,26 @@ if sys.platform == "win32": INVALID_HANDLE_VALUE = 0xFFFFFFFF # - FlushFileBuffers = WINFUNCTYPE(BOOL, HANDLE)(("FlushFileBuffers", windll.kernel32)) + FlushFileBuffers = WINFUNCTYPE( + BOOL, + HANDLE, + use_last_error=True + )(("FlushFileBuffers", windll.kernel32)) # - CloseHandle = WINFUNCTYPE(BOOL, HANDLE)(("CloseHandle", windll.kernel32)) + CloseHandle = WINFUNCTYPE( + BOOL, + HANDLE, + use_last_error=True + )(("CloseHandle", windll.kernel32)) # def flush_volume(path): + print "flush_volume(%r)" % (path,) drive = os.path.splitdrive(os.path.realpath(path))[0] + if drive.startswith("\\\\?\\"): + drive = drive[4 :] + print "drive = %r" % (drive,) hVolume = CreateFileW(u"\\\\.\\" + drive, GENERIC_WRITE, @@ -550,10 +565,10 @@ if sys.platform == "win32": None ) if hVolume == INVALID_HANDLE_VALUE: - raise WinError() + raise WinError(get_last_error()) if FlushFileBuffers(hVolume) == 0: - raise WinError() + raise WinError(get_last_error()) CloseHandle(hVolume) else: diff --git a/src/allmydata/windows/inotify.py b/src/allmydata/windows/inotify.py index 3a0f7ba6..ddb25818 100644 --- a/src/allmydata/windows/inotify.py +++ b/src/allmydata/windows/inotify.py @@ -4,7 +4,7 @@ import os, sys -from twisted.internet import reactor +from twisted.internet import defer, reactor from twisted.internet.threads import deferToThread from allmydata.util.fake_inotify import humanReadableMask, \ @@ -19,20 +19,24 @@ from allmydata.util.fake_inotify import humanReadableMask, \ IN_MASK_ADD, IN_ISDIR, IN_ONESHOT, IN_CLOSE, IN_MOVED, IN_CHANGED] from allmydata.util.assertutil import _assert, precondition -from allmydata.util.encodingutil import quote_output +from allmydata.util.deferredutil import eventually_callback, eventually_errback +from allmydata.util.encodingutil import quote_local_unicode_path from allmydata.util import log, fileutil from allmydata.util.pollmixin import PollMixin from ctypes import WINFUNCTYPE, WinError, windll, POINTER, byref, \ - create_string_buffer, addressof, Structure + create_string_buffer, addressof, Structure, get_last_error from ctypes.wintypes import BOOL, HANDLE, DWORD, LPCWSTR, LPVOID # FILE_LIST_DIRECTORY = 1 # -CreateFileW = WINFUNCTYPE(HANDLE, LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE) \ - (("CreateFileW", windll.kernel32)) +CreateFileW = WINFUNCTYPE( + HANDLE, + LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE, + use_last_error=True + )(("CreateFileW", windll.kernel32)) FILE_SHARE_READ = 0x00000001 FILE_SHARE_WRITE = 0x00000002 @@ -44,7 +48,11 @@ FILE_FLAG_BACKUP_SEMANTICS = 0x02000000 FILE_FLAG_OVERLAPPED = 0x40000000 # -CloseHandle = WINFUNCTYPE(BOOL, HANDLE)(("CloseHandle", windll.kernel32)) +CloseHandle = WINFUNCTYPE( + BOOL, + HANDLE, + use_last_error=True + )(("CloseHandle", windll.kernel32)) # class OVERLAPPED(Structure): @@ -57,9 +65,11 @@ class OVERLAPPED(Structure): ] # -ReadDirectoryChangesW = WINFUNCTYPE(BOOL, HANDLE, LPVOID, DWORD, BOOL, DWORD, - POINTER(DWORD), POINTER(OVERLAPPED), LPVOID) \ - (("ReadDirectoryChangesW", windll.kernel32)) +ReadDirectoryChangesW = WINFUNCTYPE( + BOOL, + HANDLE, LPVOID, DWORD, BOOL, DWORD, POINTER(DWORD), POINTER(OVERLAPPED), LPVOID, + use_last_error=True + )(("ReadDirectoryChangesW", windll.kernel32)) FILE_NOTIFY_CHANGE_FILE_NAME = 0x00000001 FILE_NOTIFY_CHANGE_DIR_NAME = 0x00000002 @@ -95,38 +105,34 @@ _action_to_inotify_mask = { INVALID_HANDLE_VALUE = 0xFFFFFFFF -# -WaitForMultipleObjects = WINFUNCTYPE(DWORD, DWORD, POINTER(HANDLE), BOOL, DWORD) \ - (("WaitForMultipleObjects", windll.kernel32)) - -INFINITE = 0xFFFFFFFF - -WAIT_ABANDONED = 0x00000080 -WAIT_IO_COMPLETION = 0x000000C0 -WAIT_OBJECT_0 = 0x00000000 -WAIT_TIMEOUT = 0x00000102 -WAIT_FAILED = 0xFFFFFFFF - # -CreateEventW = WINFUNCTYPE(HANDLE, LPVOID, BOOL, BOOL, LPCWSTR) \ - (("CreateEventW", windll.kernel32)) - -# -SetEvent = WINFUNCTYPE(BOOL, HANDLE)(("SetEvent", windll.kernel32)) - +CreateEventW = WINFUNCTYPE( + HANDLE, + LPVOID, BOOL, BOOL, LPCWSTR, + use_last_error=True + )(("CreateEventW", windll.kernel32)) + +# +CancelIoEx = WINFUNCTYPE( + BOOL, + HANDLE, POINTER(OVERLAPPED), + use_last_error=True + )(("CancelIoEx", windll.kernel32)) + +# +GetOverlappedResult = WINFUNCTYPE( + BOOL, + HANDLE, POINTER(OVERLAPPED), POINTER(DWORD), BOOL, + use_last_error=True + )(("GetOverlappedResult", windll.kernel32)) + +# +ERROR_OPERATION_ABORTED = 995 + +# Use these rather than False and True for Windows BOOL. FALSE = 0 TRUE = 1 -def _create_event(auto_reset): - # no security descriptor, auto_reset, unsignalled, anonymous - hEvent = CreateEventW(None, auto_reset, FALSE, None) - if hEvent is None: - raise WinError() - -def _signal_event(hEvent): - if SetEvent(hEvent) == 0: - raise WinError() - class StoppedException(Exception): """The notifier has been stopped.""" @@ -146,22 +152,36 @@ class Notification(object): return "Notification(%r, %r)" % (_action_to_string.get(self.action, self.action), self.filename) -class FileNotifyInformation(object): +# WARNING: ROCKET SCIENCE! +# ReadDirectoryChangesW is one of the most difficult APIs in Windows. +# The documentation is incomplete and misleading, and many of the possible +# ways of using it do not work in practice. In particular, robustly +# cancelling a call in order to stop monitoring the directory is +# ridiculously hard. +# +# Attempting to use it via ctypes is therefore pure foolishness :-p +# Do not change this without first reading, carefully, both parts of +# . +# Then ask Daira to review your changes. + +class FileNotifier(object): """ - I represent a buffer containing FILE_NOTIFY_INFORMATION structures, and can - iterate over those structures, decoding them into Notification objects. + I represent a buffer containing FILE_NOTIFY_INFORMATION structures, + associated with a particular directory handle. I can iterate over those + structures, decoding them into Notification objects. """ - def __init__(self, size=1024): + def __init__(self, path_u, filter, recursive=False, size=1024): + self._hDirectory = self._open_directory(path_u) + self._filter = filter + self._recursive = recursive self._size = size self._buffer = create_string_buffer(size) address = addressof(self._buffer) - _assert(address & 3 == 0, "address 0x%X returned by create_string_buffer is not DWORD-aligned" % (address,)) - - self._hStopped = _create_event(auto_reset=FALSE) - self._hNotified = _create_event(auto_reset=TRUE) - self._events = (HANDLE*2)(self._hStopped, self._hNotified) + _assert(address & 3 == 0, "address 0x%X returned by create_string_buffer is not DWORD-aligned" + % (address,)) + self._hNotified = self._create_event() self._overlapped = OVERLAPPED() self._overlapped.Internal = None self._overlapped.InternalHigh = None @@ -170,84 +190,111 @@ class FileNotifyInformation(object): self._overlapped.Pointer = None self._overlapped.hEvent = self._hNotified + self._interrupted = False + + @staticmethod + def _create_event(): + # no security descriptor, manual reset, unsignalled, anonymous + hEvent = CreateEventW(None, FALSE, FALSE, None) + if hEvent is None: + raise WinError(get_last_error()) + return hEvent + + @staticmethod + def _open_directory(path_u): + hDirectory = CreateFileW(path_u, + FILE_LIST_DIRECTORY, # access rights + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + # don't prevent other processes from accessing + None, # no security descriptor + OPEN_EXISTING, # directory must already exist + FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED, + # necessary to open a directory with overlapped I/O + None # no template file + ) + if hDirectory == INVALID_HANDLE_VALUE: + e = WinError(get_last_error()) + raise OSError("Opening directory %s gave Windows error %r: %s" % + (quote_local_unicode_path(path_u), e.args[0], e.args[1])) + return hDirectory + def __del__(self): - if hasattr(self, '_hStopped'): - CloseHandle(self._hStopped) + if hasattr(self, '_hDirectory'): + CloseHandle(self._hDirectory) if hasattr(self, '_hNotified'): CloseHandle(self._hNotified) - def stop(self): - _signal_event(self._hStopped) + def interrupt(self): + # This must be repeated until the thread that calls get_notifications() + # is confirmed to be stopped. + self._interrupted = True + CancelIoEx(self._hDirectory, None) - def read_notifications(self, hDirectory, recursive, filter): + def read_notifications(self): """This does not block.""" + if self._interrupted: + raise StoppedException() bytes_returned = DWORD(0) print "here" - r = ReadDirectoryChangesW(hDirectory, - self._buffer, + r = ReadDirectoryChangesW(self._hDirectory, + byref(self._buffer), self._size, - recursive, - filter, + TRUE if self._recursive else FALSE, + self._filter, byref(bytes_returned), self._overlapped, - None # NULL -> no completion routine + None ) + print "there" if r == 0: - raise WinError() + raise WinError(get_last_error()) def get_notifications(self): """This blocks and then iterates over the notifications.""" - - r = WaitForMultipleObjects(2, self._events, - FALSE, # wait for any, not all - INFINITE) - if r == WAIT_FAILED: - raise WinError() - if r == WAIT_OBJECT_0: # hStopped + if self._interrupted: raise StoppedException() - if r != WAIT_OBJECT_0+1: # hNotified - raise OSError("unexpected return from WaitForMultipleObjects: %d" % (r,)) + print "hereq1" + bytes_read = DWORD() + r = GetOverlappedResult(self._hDirectory, + self._overlapped, + byref(bytes_read), + TRUE) + if r == 0: + err = get_last_error() + if err == ERROR_OPERATION_ABORTED: + raise StoppedException() + raise WinError(err) + print "hereq2" + data = self._buffer.raw[:bytes_returned.value] print data pos = 0 while True: bytes = _read_dword(data, pos+8) - s = Notification(_read_dword(data, pos+4), - data[pos+12 : pos+12+bytes].decode('utf-16-le')) + try: + path_u = data[pos+12 : pos+12+bytes].decode('utf-16-le') + except UnicodeDecodeError as e: + log.err(e) + else: + s = Notification(_read_dword(data, pos+4), path_u) + print s + yield s next_entry_offset = _read_dword(data, pos) - print s - yield s if next_entry_offset == 0: break pos = pos + next_entry_offset - -def _read_dword(data, i): - # little-endian - return ( ord(data[i]) | - (ord(data[i+1]) << 8) | - (ord(data[i+2]) << 16) | - (ord(data[i+3]) << 24)) - - -def _open_directory(path_u): - hDirectory = CreateFileW(path_u, - FILE_LIST_DIRECTORY, # access rights - FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, - # don't prevent other processes from accessing - None, # no security descriptor - OPEN_EXISTING, # directory must already exist - FILE_FLAG_BACKUP_SEMANTICS, # necessary to open a directory - None # no template file - ) - if hDirectory == INVALID_HANDLE_VALUE: - e = WinError() - raise OSError("Opening directory %s gave Windows error %r: %s" % (quote_output(path_u), e.args[0], e.args[1])) - return hDirectory + @staticmethod + def _read_dword(data, i): + # little-endian + return ( ord(data[i]) | + (ord(data[i+1]) << 8) | + (ord(data[i+2]) << 16) | + (ord(data[i+3]) << 24)) def simple_test(): @@ -255,24 +302,23 @@ def simple_test(): filter = FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME | FILE_NOTIFY_CHANGE_LAST_WRITE recursive = False - hDirectory = _open_directory(path_u) - fni = FileNotifyInformation() - print "Waiting..." + notifier = FileNotifier(path_u, filter, recursive) while True: - fni.read_notifications(hDirectory, recursive, filter) - for info in fni.get_notifications(): + notifier.read_notifications() + print "Waiting..." + for info in notifier.get_notifications(): print info class INotify(PollMixin): def __init__(self): - self._fni = FileNotifyInformation() - self._started = False - self._stop = False + self._called_startReading = False + self._called_stopReading = False + self._started_d = defer.Deferred() self._stopped = False - self._filter = None + self._callbacks = None - self._hDirectory = None + self._notifier = None self._path = None self._pending = set() self._pending_delay = 1.0 @@ -282,21 +328,31 @@ class INotify(PollMixin): def startReading(self): # Twisted's version of this is synchronous. + precondition(not self._called_startReading, "startReading() called more than once") + self._called_startReading = True deferToThread(self._thread) - return self.poll(lambda: self._started) + return self._started_d def stopReading(self): - self._stop = True - self._fni.stop() - - def wait_until_stopped(self): - if not self._stop: - self.stopReading() - return self.poll(lambda: self._stopped) + # Twisted's version of this is synchronous. + precondition(self._called_startReading, "stopReading() called before startReading()") + precondition(not self._called_stopReading, "stopReading() called more than once") + self._called_stopReading = True + + # This is tricky. We don't know where the notifier thread is in its loop, + # so it could block in get_notifications *after* any pending I/O has been + # cancelled. Therefore, we need to continue interrupting until we see + # that the thread has actually stopped. + def _try_to_stop(): + if self._stopped: + return True + self._notifier.interrupt() + return False + return self.poll(_try_to_stop) def watch(self, path, mask=IN_WATCH_MASK, autoAdd=False, callbacks=None, recursive=False): - precondition(not self._started, "watch() can only be called before startReading()") - precondition(self._filter is None, "only one watch is supported") + precondition(not self._started_reading, "watch() can only be called before startReading()") + precondition(self._notifier is None, "only one watch is supported") precondition(isinstance(autoAdd, bool), autoAdd=autoAdd) precondition(isinstance(recursive, bool), recursive=recursive) precondition(not autoAdd, "autoAdd not supported") @@ -307,20 +363,20 @@ class INotify(PollMixin): path_u = path_u.decode(sys.getfilesystemencoding()) _assert(isinstance(path_u, unicode), path_u=path_u) - self._filter = FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME | FILE_NOTIFY_CHANGE_LAST_WRITE + filter = FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME | FILE_NOTIFY_CHANGE_LAST_WRITE if mask & (IN_ACCESS | IN_CLOSE_NOWRITE | IN_OPEN): - self._filter = self._filter | FILE_NOTIFY_CHANGE_LAST_ACCESS + filter |= FILE_NOTIFY_CHANGE_LAST_ACCESS if mask & IN_ATTRIB: - self._filter = self._filter | FILE_NOTIFY_CHANGE_ATTRIBUTES | FILE_NOTIFY_CHANGE_SECURITY + filter |= FILE_NOTIFY_CHANGE_ATTRIBUTES | FILE_NOTIFY_CHANGE_SECURITY - self._recursive = recursive self._callbacks = callbacks or [] - self._hDirectory = _open_directory(path_u) + self._notifier = FileNotifier(path_u, filter, recursive) def _thread(self): + started = False try: - _assert(self._filter is not None, "no watch set") + _assert(self._notifier is not None, "no watch set") # To call Twisted or Tahoe APIs, use reactor.callFromThread as described in # . @@ -329,13 +385,13 @@ class INotify(PollMixin): # We must set _started to True *after* calling read_notifications, so that # the caller of startReading() can tell when we've actually started reading. - self._fni.read_notifications(self._hDirectory, self._recursive, self._filter) - self._started = True + self._notifier.read_notifications() + if not started: + reactor.callFromThread(self._started_d.callback, None) + started = True - for info in self._fni.get_notifications(): + for info in self._notifier.get_notifications(): print info - if self._stop: - raise StoppedException() path = self._path.preauthChild(info.filename) # FilePath with Unicode path #mask = _action_to_inotify_mask.get(info.action, IN_CHANGED) @@ -348,20 +404,16 @@ class INotify(PollMixin): for cb in self._callbacks: try: cb(None, path, IN_CHANGED) - except Exception, e: + except Exception as e: log.err(e) reactor.callLater(self._pending_delay, _do_callbacks) reactor.callFromThread(_maybe_notify, path) - except StoppedException: - self._do_stop() - except Exception, e: - log.err(e) - self._do_stop() - raise - - def _do_stop(self): - hDirectory = self._hDirectory - self._callbacks = [] - self._hDirectory = None - CloseHandle(hDirectory) - self._stopped = True + except Exception as e: + if not isinstance(e, StoppedException): + log.err(e) + if not started: + # startReading() should fail in this case. + reactor.callFromThread(self._started_d.errback, Failure()) + finally: + self._callbacks = [] + self._stopped = True -- 2.45.2