]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
WIP.
authorDaira Hopwood <daira@jacaranda.org>
Mon, 18 May 2015 01:35:27 +0000 (02:35 +0100)
committerDaira Hopwood <daira@jacaranda.org>
Mon, 18 May 2015 01:35:27 +0000 (02:35 +0100)
Signed-off-by: Daira Hopwood <daira@jacaranda.org>
src/allmydata/util/fileutil.py
src/allmydata/windows/inotify.py

index f63e68c5392583407543c9438dbf00d78fab54ae..6dcf4ee1072da18394602518e85d336d19db5c4c 100644 (file)
@@ -550,8 +550,11 @@ if sys.platform == "win32":
     from ctypes.wintypes import BOOL, HANDLE, DWORD, LPCWSTR, LPVOID
 
     # <http://msdn.microsoft.com/en-us/library/aa363858%28v=vs.85%29.aspx>
-    CreateFileW = WINFUNCTYPE(HANDLE, LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE) \
-                      (("CreateFileW", windll.kernel32))
+    CreateFileW = WINFUNCTYPE(
+        HANDLE,
+          LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE,
+        use_last_error=True
+      )(("CreateFileW", windll.kernel32))
 
     GENERIC_WRITE        = 0x40000000
     FILE_SHARE_READ      = 0x00000001
@@ -560,14 +563,26 @@ if sys.platform == "win32":
     INVALID_HANDLE_VALUE = 0xFFFFFFFF
 
     # <http://msdn.microsoft.com/en-us/library/aa364439%28v=vs.85%29.aspx>
-    FlushFileBuffers = WINFUNCTYPE(BOOL, HANDLE)(("FlushFileBuffers", windll.kernel32))
+    FlushFileBuffers = WINFUNCTYPE(
+        BOOL,
+          HANDLE,
+        use_last_error=True
+      )(("FlushFileBuffers", windll.kernel32))
 
     # <http://msdn.microsoft.com/en-us/library/ms724211%28v=vs.85%29.aspx>
-    CloseHandle = WINFUNCTYPE(BOOL, HANDLE)(("CloseHandle", windll.kernel32))
+    CloseHandle = WINFUNCTYPE(
+        BOOL,
+          HANDLE,
+        use_last_error=True
+      )(("CloseHandle", windll.kernel32))
 
     # <http://social.msdn.microsoft.com/forums/en-US/netfxbcl/thread/4465cafb-f4ed-434f-89d8-c85ced6ffaa8/>
     def flush_volume(path):
+        print "flush_volume(%r)" % (path,)
         drive = os.path.splitdrive(os.path.realpath(path))[0]
+        if drive.startswith("\\\\?\\"):
+            drive = drive[4 :]
+        print "drive = %r" % (drive,)
 
         hVolume = CreateFileW(u"\\\\.\\" + drive,
                               GENERIC_WRITE,
@@ -578,10 +593,10 @@ if sys.platform == "win32":
                               None
                              )
         if hVolume == INVALID_HANDLE_VALUE:
-            raise WinError()
+            raise WinError(get_last_error())
 
         if FlushFileBuffers(hVolume) == 0:
-            raise WinError()
+            raise WinError(get_last_error())
 
         CloseHandle(hVolume)
 else:
index 3a0f7ba60ec9ca8f021d8864e47076eb562d5744..ddb2581824fc1c460e8d595fa2335621e0d3540a 100644 (file)
@@ -4,7 +4,7 @@
 
 import os, sys
 
-from twisted.internet import reactor
+from twisted.internet import defer, reactor
 from twisted.internet.threads import deferToThread
 
 from allmydata.util.fake_inotify import humanReadableMask, \
@@ -19,20 +19,24 @@ from allmydata.util.fake_inotify import humanReadableMask, \
     IN_MASK_ADD, IN_ISDIR, IN_ONESHOT, IN_CLOSE, IN_MOVED, IN_CHANGED]
 
 from allmydata.util.assertutil import _assert, precondition
-from allmydata.util.encodingutil import quote_output
+from allmydata.util.deferredutil import eventually_callback, eventually_errback
+from allmydata.util.encodingutil import quote_local_unicode_path
 from allmydata.util import log, fileutil
 from allmydata.util.pollmixin import PollMixin
 
 from ctypes import WINFUNCTYPE, WinError, windll, POINTER, byref, \
-    create_string_buffer, addressof, Structure
+    create_string_buffer, addressof, Structure, get_last_error
 from ctypes.wintypes import BOOL, HANDLE, DWORD, LPCWSTR, LPVOID
 
 # <https://msdn.microsoft.com/en-us/library/windows/desktop/gg258116%28v=vs.85%29.aspx>
 FILE_LIST_DIRECTORY              = 1
 
 # <https://msdn.microsoft.com/en-us/library/windows/desktop/aa363858%28v=vs.85%29.aspx>
-CreateFileW = WINFUNCTYPE(HANDLE, LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE) \
-                  (("CreateFileW", windll.kernel32))
+CreateFileW = WINFUNCTYPE(
+    HANDLE,
+      LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE,
+    use_last_error=True
+  )(("CreateFileW", windll.kernel32))
 
 FILE_SHARE_READ                  = 0x00000001
 FILE_SHARE_WRITE                 = 0x00000002
@@ -44,7 +48,11 @@ FILE_FLAG_BACKUP_SEMANTICS       = 0x02000000
 FILE_FLAG_OVERLAPPED             = 0x40000000
 
 # <https://msdn.microsoft.com/en-us/library/windows/desktop/ms724211%28v=vs.85%29.aspx>
-CloseHandle = WINFUNCTYPE(BOOL, HANDLE)(("CloseHandle", windll.kernel32))
+CloseHandle = WINFUNCTYPE(
+    BOOL,
+      HANDLE,
+    use_last_error=True
+  )(("CloseHandle", windll.kernel32))
 
 # <https://msdn.microsoft.com/en-us/library/windows/desktop/ms684342%28v=vs.85%29.aspx>
 class OVERLAPPED(Structure):
@@ -57,9 +65,11 @@ class OVERLAPPED(Structure):
                ]
 
 # <https://msdn.microsoft.com/en-us/library/windows/desktop/aa365465%28v=vs.85%29.aspx>
-ReadDirectoryChangesW = WINFUNCTYPE(BOOL, HANDLE, LPVOID, DWORD, BOOL, DWORD,
-                                    POINTER(DWORD), POINTER(OVERLAPPED), LPVOID) \
-                            (("ReadDirectoryChangesW", windll.kernel32))
+ReadDirectoryChangesW = WINFUNCTYPE(
+    BOOL,
+      HANDLE, LPVOID, DWORD, BOOL, DWORD, POINTER(DWORD), POINTER(OVERLAPPED), LPVOID,
+    use_last_error=True
+  )(("ReadDirectoryChangesW", windll.kernel32))
 
 FILE_NOTIFY_CHANGE_FILE_NAME     = 0x00000001
 FILE_NOTIFY_CHANGE_DIR_NAME      = 0x00000002
@@ -95,38 +105,34 @@ _action_to_inotify_mask = {
 
 INVALID_HANDLE_VALUE             = 0xFFFFFFFF
 
-# <https://msdn.microsoft.com/en-us/library/windows/desktop/ms687025%28v=vs.85%29.aspx>
-WaitForMultipleObjects = WINFUNCTYPE(DWORD, DWORD, POINTER(HANDLE), BOOL, DWORD) \
-                             (("WaitForMultipleObjects", windll.kernel32))
-
-INFINITE           = 0xFFFFFFFF
-
-WAIT_ABANDONED     = 0x00000080
-WAIT_IO_COMPLETION = 0x000000C0
-WAIT_OBJECT_0      = 0x00000000
-WAIT_TIMEOUT       = 0x00000102
-WAIT_FAILED        = 0xFFFFFFFF
-
 # <https://msdn.microsoft.com/en-us/library/windows/desktop/ms682396%28v=vs.85%29.aspx>
-CreateEventW = WINFUNCTYPE(HANDLE, LPVOID, BOOL, BOOL, LPCWSTR) \
-                   (("CreateEventW", windll.kernel32))
-
-# <https://msdn.microsoft.com/en-us/library/windows/desktop/ms686211%28v=vs.85%29.aspx>
-SetEvent = WINFUNCTYPE(BOOL, HANDLE)(("SetEvent", windll.kernel32))
-
+CreateEventW = WINFUNCTYPE(
+    HANDLE,
+      LPVOID, BOOL, BOOL, LPCWSTR,
+    use_last_error=True
+  )(("CreateEventW", windll.kernel32))
+
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/aa363792%28v=vs.85%29.aspx>
+CancelIoEx = WINFUNCTYPE(
+    BOOL,
+      HANDLE, POINTER(OVERLAPPED),
+    use_last_error=True
+  )(("CancelIoEx", windll.kernel32))
+
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/ms683209%28v=vs.85%29.aspx>
+GetOverlappedResult = WINFUNCTYPE(
+    BOOL,
+      HANDLE, POINTER(OVERLAPPED), POINTER(DWORD), BOOL,
+    use_last_error=True
+  )(("GetOverlappedResult", windll.kernel32))
+
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/ms681388%28v=vs.85%29.aspx>
+ERROR_OPERATION_ABORTED = 995
+
+# Use these rather than False and True for Windows BOOL.
 FALSE = 0
 TRUE  = 1
 
-def _create_event(auto_reset):
-    # no security descriptor, auto_reset, unsignalled, anonymous
-    hEvent = CreateEventW(None, auto_reset, FALSE, None)
-    if hEvent is None:
-        raise WinError()
-
-def _signal_event(hEvent):
-    if SetEvent(hEvent) == 0:
-        raise WinError()
-
 
 class StoppedException(Exception):
     """The notifier has been stopped."""
@@ -146,22 +152,36 @@ class Notification(object):
         return "Notification(%r, %r)" % (_action_to_string.get(self.action, self.action), self.filename)
 
 
-class FileNotifyInformation(object):
+# WARNING: ROCKET SCIENCE!
+# ReadDirectoryChangesW is one of the most difficult APIs in Windows.
+# The documentation is incomplete and misleading, and many of the possible
+# ways of using it do not work in practice. In particular, robustly
+# cancelling a call in order to stop monitoring the directory is
+# ridiculously hard.
+#
+# Attempting to use it via ctypes is therefore pure foolishness :-p
+# Do not change this without first reading, carefully, both parts of
+# <http://qualapps.blogspot.co.uk/2010/05/understanding-readdirectorychangesw.html>.
+# Then ask Daira to review your changes.
+
+class FileNotifier(object):
     """
-    I represent a buffer containing FILE_NOTIFY_INFORMATION structures, and can
-    iterate over those structures, decoding them into Notification objects.
+    I represent a buffer containing FILE_NOTIFY_INFORMATION structures,
+    associated with a particular directory handle. I can iterate over those
+    structures, decoding them into Notification objects.
     """
 
-    def __init__(self, size=1024):
+    def __init__(self, path_u, filter, recursive=False, size=1024):
+        self._hDirectory = self._open_directory(path_u)
+        self._filter = filter
+        self._recursive = recursive
         self._size = size
         self._buffer = create_string_buffer(size)
         address = addressof(self._buffer)
-        _assert(address & 3 == 0, "address 0x%X returned by create_string_buffer is not DWORD-aligned" % (address,))
-
-        self._hStopped  = _create_event(auto_reset=FALSE)
-        self._hNotified = _create_event(auto_reset=TRUE)
-        self._events = (HANDLE*2)(self._hStopped, self._hNotified)
+        _assert(address & 3 == 0, "address 0x%X returned by create_string_buffer is not DWORD-aligned"
+                                  % (address,))
 
+        self._hNotified = self._create_event()
         self._overlapped = OVERLAPPED()
         self._overlapped.Internal = None
         self._overlapped.InternalHigh = None
@@ -170,84 +190,111 @@ class FileNotifyInformation(object):
         self._overlapped.Pointer = None
         self._overlapped.hEvent = self._hNotified
 
+        self._interrupted = False
+
+    @staticmethod
+    def _create_event():
+        # no security descriptor, manual reset, unsignalled, anonymous
+        hEvent = CreateEventW(None, FALSE, FALSE, None)
+        if hEvent is None:
+            raise WinError(get_last_error())
+        return hEvent
+
+    @staticmethod
+    def _open_directory(path_u):
+        hDirectory = CreateFileW(path_u,
+                                                                FILE_LIST_DIRECTORY,         # access rights
+                                                                FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
+                                                                                                                         # don't prevent other processes from accessing
+                                                                None,                        # no security descriptor
+                                                                OPEN_EXISTING,               # directory must already exist
+                                                                FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
+                                                                                                                         # necessary to open a directory with overlapped I/O
+                                                                None                         # no template file
+                                                               )
+        if hDirectory == INVALID_HANDLE_VALUE:
+            e = WinError(get_last_error())
+            raise OSError("Opening directory %s gave Windows error %r: %s" %
+                          (quote_local_unicode_path(path_u), e.args[0], e.args[1]))
+        return hDirectory
+
     def __del__(self):
-        if hasattr(self, '_hStopped'):
-            CloseHandle(self._hStopped)
+        if hasattr(self, '_hDirectory'):
+            CloseHandle(self._hDirectory)
         if hasattr(self, '_hNotified'):
             CloseHandle(self._hNotified)
 
-    def stop(self):
-        _signal_event(self._hStopped)
+    def interrupt(self):
+        # This must be repeated until the thread that calls get_notifications()
+        # is confirmed to be stopped.
+        self._interrupted = True
+        CancelIoEx(self._hDirectory, None)
 
-    def read_notifications(self, hDirectory, recursive, filter):
+    def read_notifications(self):
         """This does not block."""
+        if self._interrupted:
+            raise StoppedException()
 
         bytes_returned = DWORD(0)
         print "here"
-        r = ReadDirectoryChangesW(hDirectory,
-                                  self._buffer,
+        r = ReadDirectoryChangesW(self._hDirectory,
+                                  byref(self._buffer),
                                   self._size,
-                                  recursive,
-                                  filter,
+                                  TRUE if self._recursive else FALSE,
+                                  self._filter,
                                   byref(bytes_returned),
                                   self._overlapped,
-                                  None   # NULL -> no completion routine
+                                  None
                                  )
+        print "there"
         if r == 0:
-            raise WinError()
+            raise WinError(get_last_error())
 
     def get_notifications(self):
         """This blocks and then iterates over the notifications."""
-
-        r = WaitForMultipleObjects(2, self._events,
-                                   FALSE, # wait for any, not all
-                                   INFINITE)
-        if r == WAIT_FAILED:
-            raise WinError()
-        if r == WAIT_OBJECT_0:  # hStopped
+        if self._interrupted:
             raise StoppedException()
-        if r != WAIT_OBJECT_0+1:  # hNotified
-            raise OSError("unexpected return from WaitForMultipleObjects: %d" % (r,))
 
+        print "hereq1"
+        bytes_read = DWORD()
+        r = GetOverlappedResult(self._hDirectory,
+                                self._overlapped,
+                                byref(bytes_read),
+                                TRUE)
+        if r == 0:
+            err = get_last_error()
+            if err == ERROR_OPERATION_ABORTED:
+                raise StoppedException()
+            raise WinError(err)
+        print "hereq2"
+        
         data = self._buffer.raw[:bytes_returned.value]
         print data
 
         pos = 0
         while True:
             bytes = _read_dword(data, pos+8)
-            s = Notification(_read_dword(data, pos+4),
-                             data[pos+12 : pos+12+bytes].decode('utf-16-le'))
+            try:
+                path_u = data[pos+12 : pos+12+bytes].decode('utf-16-le')
+            except UnicodeDecodeError as e:
+                log.err(e)
+            else:
+                s = Notification(_read_dword(data, pos+4), path_u)
+                print s
+                yield s
 
             next_entry_offset = _read_dword(data, pos)
-            print s
-            yield s
             if next_entry_offset == 0:
                 break
             pos = pos + next_entry_offset
 
-
-def _read_dword(data, i):
-    # little-endian
-    return ( ord(data[i])          |
-            (ord(data[i+1]) <<  8) |
-            (ord(data[i+2]) << 16) |
-            (ord(data[i+3]) << 24))
-
-
-def _open_directory(path_u):
-    hDirectory = CreateFileW(path_u,
-                             FILE_LIST_DIRECTORY,         # access rights
-                             FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
-                                                          # don't prevent other processes from accessing
-                             None,                        # no security descriptor
-                             OPEN_EXISTING,               # directory must already exist
-                             FILE_FLAG_BACKUP_SEMANTICS,  # necessary to open a directory
-                             None                         # no template file
-                            )
-    if hDirectory == INVALID_HANDLE_VALUE:
-        e = WinError()
-        raise OSError("Opening directory %s gave Windows error %r: %s" % (quote_output(path_u), e.args[0], e.args[1]))
-    return hDirectory
+    @staticmethod
+    def _read_dword(data, i):
+        # little-endian
+        return ( ord(data[i])          |
+                (ord(data[i+1]) <<  8) |
+                (ord(data[i+2]) << 16) |
+                (ord(data[i+3]) << 24))
 
 
 def simple_test():
@@ -255,24 +302,23 @@ def simple_test():
     filter = FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME | FILE_NOTIFY_CHANGE_LAST_WRITE
     recursive = False
 
-    hDirectory = _open_directory(path_u)
-    fni = FileNotifyInformation()
-    print "Waiting..."
+    notifier = FileNotifier(path_u, filter, recursive)
     while True:
-        fni.read_notifications(hDirectory, recursive, filter)
-        for info in fni.get_notifications():
+        notifier.read_notifications()
+        print "Waiting..."
+        for info in notifier.get_notifications():
             print info
 
 
 class INotify(PollMixin):
     def __init__(self):
-        self._fni = FileNotifyInformation()
-        self._started = False
-        self._stop = False
+        self._called_startReading = False
+        self._called_stopReading = False
+        self._started_d = defer.Deferred()
         self._stopped = False
-        self._filter = None
+
         self._callbacks = None
-        self._hDirectory = None
+        self._notifier = None
         self._path = None
         self._pending = set()
         self._pending_delay = 1.0
@@ -282,21 +328,31 @@ class INotify(PollMixin):
 
     def startReading(self):
         # Twisted's version of this is synchronous.
+        precondition(not self._called_startReading, "startReading() called more than once")
+        self._called_startReading = True
         deferToThread(self._thread)
-        return self.poll(lambda: self._started)
+        return self._started_d
 
     def stopReading(self):
-        self._stop = True
-        self._fni.stop()
-
-    def wait_until_stopped(self):
-        if not self._stop:
-            self.stopReading()
-        return self.poll(lambda: self._stopped)
+        # Twisted's version of this is synchronous.
+        precondition(self._called_startReading, "stopReading() called before startReading()")
+        precondition(not self._called_stopReading, "stopReading() called more than once")
+        self._called_stopReading = True
+
+        # This is tricky. We don't know where the notifier thread is in its loop,
+        # so it could block in get_notifications *after* any pending I/O has been
+        # cancelled. Therefore, we need to continue interrupting until we see
+        # that the thread has actually stopped.
+        def _try_to_stop():
+            if self._stopped:
+                return True
+            self._notifier.interrupt()
+            return False
+        return self.poll(_try_to_stop)
 
     def watch(self, path, mask=IN_WATCH_MASK, autoAdd=False, callbacks=None, recursive=False):
-        precondition(not self._started, "watch() can only be called before startReading()")
-        precondition(self._filter is None, "only one watch is supported")
+        precondition(not self._started_reading, "watch() can only be called before startReading()")
+        precondition(self._notifier is None, "only one watch is supported")
         precondition(isinstance(autoAdd, bool), autoAdd=autoAdd)
         precondition(isinstance(recursive, bool), recursive=recursive)
         precondition(not autoAdd, "autoAdd not supported")
@@ -307,20 +363,20 @@ class INotify(PollMixin):
             path_u = path_u.decode(sys.getfilesystemencoding())
             _assert(isinstance(path_u, unicode), path_u=path_u)
 
-        self._filter = FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME | FILE_NOTIFY_CHANGE_LAST_WRITE
+        filter = FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME | FILE_NOTIFY_CHANGE_LAST_WRITE
 
         if mask & (IN_ACCESS | IN_CLOSE_NOWRITE | IN_OPEN):
-            self._filter = self._filter | FILE_NOTIFY_CHANGE_LAST_ACCESS
+            filter |= FILE_NOTIFY_CHANGE_LAST_ACCESS
         if mask & IN_ATTRIB:
-            self._filter = self._filter | FILE_NOTIFY_CHANGE_ATTRIBUTES | FILE_NOTIFY_CHANGE_SECURITY
+            filter |= FILE_NOTIFY_CHANGE_ATTRIBUTES | FILE_NOTIFY_CHANGE_SECURITY
 
-        self._recursive = recursive
         self._callbacks = callbacks or []
-        self._hDirectory = _open_directory(path_u)
+        self._notifier = FileNotifier(path_u, filter, recursive)
 
     def _thread(self):
+        started = False
         try:
-            _assert(self._filter is not None, "no watch set")
+            _assert(self._notifier is not None, "no watch set")
 
             # To call Twisted or Tahoe APIs, use reactor.callFromThread as described in
             # <http://twistedmatrix.com/documents/current/core/howto/threading.html>.
@@ -329,13 +385,13 @@ class INotify(PollMixin):
                 # We must set _started to True *after* calling read_notifications, so that
                 # the caller of startReading() can tell when we've actually started reading.
 
-                self._fni.read_notifications(self._hDirectory, self._recursive, self._filter)
-                self._started = True
+                self._notifier.read_notifications()
+                if not started:
+                    reactor.callFromThread(self._started_d.callback, None)
+                    started = True
 
-                for info in self._fni.get_notifications():
+                for info in self._notifier.get_notifications():
                     print info
-                    if self._stop:
-                        raise StoppedException()
 
                     path = self._path.preauthChild(info.filename)  # FilePath with Unicode path
                     #mask = _action_to_inotify_mask.get(info.action, IN_CHANGED)
@@ -348,20 +404,16 @@ class INotify(PollMixin):
                                 for cb in self._callbacks:
                                     try:
                                         cb(None, path, IN_CHANGED)
-                                    except Exception, e:
+                                    except Exception as e:
                                         log.err(e)
                             reactor.callLater(self._pending_delay, _do_callbacks)
                     reactor.callFromThread(_maybe_notify, path)
-        except StoppedException:
-            self._do_stop()
-        except Exception, e:
-            log.err(e)
-            self._do_stop()
-            raise
-
-    def _do_stop(self):
-        hDirectory = self._hDirectory
-        self._callbacks = []
-        self._hDirectory = None
-        CloseHandle(hDirectory)
-        self._stopped = True
+        except Exception as e:
+            if not isinstance(e, StoppedException):
+                log.err(e)
+            if not started:
+                # startReading() should fail in this case.
+                reactor.callFromThread(self._started_d.errback, Failure())
+        finally:
+            self._callbacks = []
+            self._stopped = True