WIP 1431.windows-drop-upload.3
authorDaira Hopwood <daira@jacaranda.org>
Wed, 29 Apr 2015 15:19:53 +0000 (16:19 +0100)
committerDaira Hopwood <daira@jacaranda.org>
Wed, 29 Apr 2015 15:19:53 +0000 (16:19 +0100)
Signed-off-by: Daira Hopwood <daira@jacaranda.org>
src/allmydata/frontends/drop_upload.py
src/allmydata/test/test_drop_upload.py
src/allmydata/windows/inotify.py

index f765df3d3d1a312f2027600b21006aef7966d23d..8bcb03d7da817c741bc9940549c5532cabf2d21e 100644 (file)
@@ -121,11 +121,12 @@ class DropUploader(service.MultiService):
             return Failure(Exception('ERROR: Unable to load magic folder db.'))
 
         service.MultiService.startService(self)
-        d = self._notifier.startReading()
-
-        self._scan(self._local_dir)
 
+        # startReading can be asynchronous [Windows] or synchronous [Twisted's INotify].
+        d = defer.succeed(None)
+        d.addCallback(lambda ign: self._notifier.startReading())
         self._stats_provider.count('drop_upload.dirs_monitored', 1)
+        self._scan(self._local_dir)
         return d
 
     def _add_to_dequeue(self, path):
index 01fb4530fab6a0b236c1ef43c6fb39ff270086ba..cf84e6e143e74de3878712878ff1b863cd25124c 100644 (file)
@@ -63,6 +63,7 @@ class DropUploadTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin, NonA
     def _cleanup(self, res):
         d = defer.succeed(None)
         if self.uploader is not None:
+            print "XXX"
             d.addCallback(lambda ign: self.uploader.finish(for_tests=True))
         d.addCallback(lambda ign: res)
         return d
index 1a719f9afd9a2fb51fdd96e0ef97ad2c0a270cc1..4255b6fd842b88bd0bb98bc104cc6dcfb733295a 100644 (file)
@@ -23,13 +23,14 @@ from allmydata.util.encodingutil import quote_output
 from allmydata.util import log, fileutil
 from allmydata.util.pollmixin import PollMixin
 
-from ctypes import WINFUNCTYPE, WinError, windll, POINTER, byref, create_string_buffer, addressof
+from ctypes import WINFUNCTYPE, WinError, windll, POINTER, byref, \
+    create_string_buffer, addressof, Structure
 from ctypes.wintypes import BOOL, HANDLE, DWORD, LPCWSTR, LPVOID
 
-# <http://msdn.microsoft.com/en-us/library/gg258116%28v=vs.85%29.aspx>
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/gg258116%28v=vs.85%29.aspx>
 FILE_LIST_DIRECTORY              = 1
 
-# <http://msdn.microsoft.com/en-us/library/aa363858%28v=vs.85%29.aspx>
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/aa363858%28v=vs.85%29.aspx>
 CreateFileW = WINFUNCTYPE(HANDLE, LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE) \
                   (("CreateFileW", windll.kernel32))
 
@@ -40,12 +41,24 @@ FILE_SHARE_DELETE                = 0x00000004
 OPEN_EXISTING                    = 3
 
 FILE_FLAG_BACKUP_SEMANTICS       = 0x02000000
+FILE_FLAG_OVERLAPPED             = 0x40000000
 
-# <http://msdn.microsoft.com/en-us/library/ms724211%28v=vs.85%29.aspx>
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/ms724211%28v=vs.85%29.aspx>
 CloseHandle = WINFUNCTYPE(BOOL, HANDLE)(("CloseHandle", windll.kernel32))
 
-# <http://msdn.microsoft.com/en-us/library/aa365465%28v=vs.85%29.aspx>
-ReadDirectoryChangesW = WINFUNCTYPE(BOOL, HANDLE, LPVOID, DWORD, BOOL, DWORD, POINTER(DWORD), LPVOID, LPVOID) \
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/ms684342%28v=vs.85%29.aspx>
+class OVERLAPPED(Structure):
+    _fields_ = [('Internal', LPVOID),
+                ('InternalHigh', LPVOID),
+                ('Offset', DWORD),
+                ('OffsetHigh', DWORD),
+                ('Pointer', LPVOID),
+                ('hEvent', HANDLE),
+               ]
+
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/aa365465%28v=vs.85%29.aspx>
+ReadDirectoryChangesW = WINFUNCTYPE(BOOL, HANDLE, LPVOID, DWORD, BOOL, DWORD,
+                                    POINTER(DWORD), POINTER(OVERLAPPED), LPVOID) \
                             (("ReadDirectoryChangesW", windll.kernel32))
 
 FILE_NOTIFY_CHANGE_FILE_NAME     = 0x00000001
@@ -57,7 +70,7 @@ FILE_NOTIFY_CHANGE_LAST_ACCESS   = 0x00000020
 #FILE_NOTIFY_CHANGE_CREATION     = 0x00000040
 FILE_NOTIFY_CHANGE_SECURITY      = 0x00000100
 
-# <http://msdn.microsoft.com/en-us/library/aa364391%28v=vs.85%29.aspx>
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/aa364391%28v=vs.85%29.aspx>
 FILE_ACTION_ADDED                = 0x00000001
 FILE_ACTION_REMOVED              = 0x00000002
 FILE_ACTION_MODIFIED             = 0x00000003
@@ -82,8 +95,45 @@ _action_to_inotify_mask = {
 
 INVALID_HANDLE_VALUE             = 0xFFFFFFFF
 
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/ms687025%28v=vs.85%29.aspx>
+WaitForMultipleObjects = WINFUNCTYPE(DWORD, DWORD, POINTER(HANDLE), BOOL, DWORD) \
+                             (("WaitForMultipleObjects", windll.kernel32))
 
-class Event(object):
+INFINITE           = 0xFFFFFFFF
+
+WAIT_ABANDONED     = 0x00000080
+WAIT_IO_COMPLETION = 0x000000C0
+WAIT_OBJECT_0      = 0x00000000
+WAIT_TIMEOUT       = 0x00000102
+WAIT_FAILED        = 0xFFFFFFFF
+
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/ms682396%28v=vs.85%29.aspx>
+CreateEventW = WINFUNCTYPE(HANDLE, LPVOID, BOOL, BOOL, LPCWSTR) \
+                   (("CreateEventW", windll.kernel32))
+
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/ms686211%28v=vs.85%29.aspx>
+SetEvent = WINFUNCTYPE(BOOL, HANDLE)(("SetEvent", windll.kernel32))
+
+FALSE = 0
+TRUE  = 1
+
+def _create_event(auto_reset):
+    # no security descriptor, auto_reset, unsignalled, anonymous
+    hEvent = CreateEventW(None, auto_reset, FALSE, None)
+    if hEvent is None:
+        raise WinError()
+
+def _signal_event(hEvent):
+    if SetEvent(hEvent) == 0:
+        raise WinError()
+
+
+class StoppedException(Exception):
+    """The notifier has been stopped."""
+    pass
+
+
+class Notification(object):
     """
     * action:   a FILE_ACTION_* constant (not a bit mask)
     * filename: a Unicode string, giving the name relative to the watched directory
@@ -93,57 +143,95 @@ class Event(object):
         self.filename = filename
 
     def __repr__(self):
-        return "Event(%r, %r)" % (_action_to_string.get(self.action, self.action), self.filename)
+        return "Notification(%r, %r)" % (_action_to_string.get(self.action, self.action), self.filename)
 
 
 class FileNotifyInformation(object):
     """
     I represent a buffer containing FILE_NOTIFY_INFORMATION structures, and can
-    iterate over those structures, decoding them into Event objects.
+    iterate over those structures, decoding them into Notification objects.
     """
 
     def __init__(self, size=1024):
-        self.size = size
-        self.buffer = create_string_buffer(size)
-        address = addressof(self.buffer)
+        self._size = size
+        self._buffer = create_string_buffer(size)
+        address = addressof(self._buffer)
         _assert(address & 3 == 0, "address 0x%X returned by create_string_buffer is not DWORD-aligned" % (address,))
-        self.data = None
 
-    def read_changes(self, hDirectory, recursive, filter):
+        self._hStopped  = _create_event(auto_reset=FALSE)
+        self._hNotified = _create_event(auto_reset=TRUE)
+        self._events = (HANDLE*2)(self._hStopped, self._hNotified)
+
+        self._overlapped = OVERLAPPED()
+        self._overlapped.Internal = None
+        self._overlapped.InternalHigh = None
+        self._overlapped.Offset = 0
+        self._overlapped.OffsetHigh = 0
+        self._overlapped.Pointer = None
+        self._overlapped.hEvent = self._hNotified
+
+    def __del__(self):
+        if hasattr(self, '_hStopped'):
+            CloseHandle(self._hStopped)
+        if hasattr(self, '_hNotified'):
+            CloseHandle(self._hNotified)
+
+    def stop(self):
+        _signal_event(self._hStopped)
+
+    def read_notifications(self, hDirectory, recursive, filter):
+        """This does not block."""
+
         bytes_returned = DWORD(0)
+        print "here"
         r = ReadDirectoryChangesW(hDirectory,
-                                  self.buffer,
-                                  self.size,
+                                  self._buffer,
+                                  self._size,
                                   recursive,
                                   filter,
                                   byref(bytes_returned),
-                                  None,  # NULL -> no overlapped I/O
+                                  self._overlapped,
                                   None   # NULL -> no completion routine
                                  )
         if r == 0:
             raise WinError()
-        self.data = self.buffer.raw[:bytes_returned.value]
 
-    def __iter__(self):
-        # Iterator implemented as generator: <http://docs.python.org/library/stdtypes.html#generator-types>
+    def get_notifications(self):
+        """This blocks and then iterates over the notifications."""
+
+        r = WaitForMultipleObjects(2, self._events,
+                                   FALSE, # wait for any, not all
+                                   INFINITE)
+        if r == WAIT_FAILED:
+            raise WinError()
+        if r == WAIT_OBJECT_0:  # hStopped
+            raise StoppedException()
+        if r != WAIT_OBJECT_0+1:  # hNotified
+            raise OSError("unexpected return from WaitForMultipleObjects: %d" % (r,))
+
+        data = self._buffer.raw[:bytes_returned.value]
+        print data
+
         pos = 0
         while True:
-            bytes = self._read_dword(pos+8)
-            s = Event(self._read_dword(pos+4),
-                      self.data[pos+12 : pos+12+bytes].decode('utf-16-le'))
+            bytes = _read_dword(data, pos+8)
+            s = Notification(_read_dword(data, pos+4),
+                             data[pos+12 : pos+12+bytes].decode('utf-16-le'))
 
-            next_entry_offset = self._read_dword(pos)
+            next_entry_offset = _read_dword(data, pos)
+            print s
             yield s
             if next_entry_offset == 0:
                 break
             pos = pos + next_entry_offset
 
-    def _read_dword(self, i):
-        # little-endian
-        return ( ord(self.data[i])          |
-                (ord(self.data[i+1]) <<  8) |
-                (ord(self.data[i+2]) << 16) |
-                (ord(self.data[i+3]) << 24))
+
+def _read_dword(data, i):
+    # little-endian
+    return ( ord(data[i])          |
+            (ord(data[i+1]) <<  8) |
+            (ord(data[i+2]) << 16) |
+            (ord(data[i+3]) << 24))
 
 
 def _open_directory(path_u):
@@ -171,20 +259,17 @@ def simple_test():
     fni = FileNotifyInformation()
     print "Waiting..."
     while True:
-        fni.read_changes(hDirectory, recursive, filter)
-        print repr(fni.data)
-        for info in fni:
+        fni.read_notifications(hDirectory, recursive, filter)
+        for info in fni.get_notifications():
             print info
 
 
-NOT_STARTED = "NOT_STARTED"
-STARTED     = "STARTED"
-STOPPING    = "STOPPING"
-STOPPED     = "STOPPED"
-
 class INotify(PollMixin):
     def __init__(self):
-        self._state = NOT_STARTED
+        self._fni = FileNotifyInformation()
+        self._started = False
+        self._stop = False
+        self._stopped = False
         self._filter = None
         self._callbacks = None
         self._hDirectory = None
@@ -196,20 +281,21 @@ class INotify(PollMixin):
         self._pending_delay = delay
 
     def startReading(self):
+        # Twisted's version of this is synchronous.
         deferToThread(self._thread)
-        return self.poll(lambda: self._state != NOT_STARTED)
+        return self.poll(lambda: self._started)
 
     def stopReading(self):
-        # FIXME race conditions
-        if self._state != STOPPED:
-            self._state = STOPPING
+        self._stop = True
+        self._fni.stop()
 
     def wait_until_stopped(self):
-        fileutil.write(os.path.join(self._path.path, u".ignore-me"), "")
-        return self.poll(lambda: self._state == STOPPED)
+        if not self._stop:
+            self.stopReading()
+        return self.poll(lambda: self._stopped)
 
     def watch(self, path, mask=IN_WATCH_MASK, autoAdd=False, callbacks=None, recursive=False):
-        precondition(self._state == NOT_STARTED, "watch() can only be called before startReading()", state=self._state)
+        precondition(not self._started, "watch() can only be called before startReading()")
         precondition(self._filter is None, "only one watch is supported")
         precondition(isinstance(autoAdd, bool), autoAdd=autoAdd)
         precondition(isinstance(recursive, bool), recursive=recursive)
@@ -239,19 +325,17 @@ class INotify(PollMixin):
             # To call Twisted or Tahoe APIs, use reactor.callFromThread as described in
             # <http://twistedmatrix.com/documents/current/core/howto/threading.html>.
 
-            fni = FileNotifyInformation()
-
             while True:
-                self._state = STARTED
-                fni.read_changes(self._hDirectory, self._recursive, self._filter)
-                for info in fni:
-                    if self._state == STOPPING:
-                        hDirectory = self._hDirectory
-                        self._callbacks = None
-                        self._hDirectory = None
-                        CloseHandle(hDirectory)
-                        self._state = STOPPED
-                        return
+                # We must set _started to True *after* calling read_notifications, so that
+                # the caller of startReading() can tell when we've actually started reading.
+
+                self._fni.read_notifications(self._hDirectory, self._recursive, self._filter)
+                self._started = True
+
+                for info in self._fni.get_notifications():
+                    print info
+                    if self._stop:
+                        raise StoppedException()
 
                     path = self._path.preauthChild(info.filename)  # FilePath with Unicode path
                     #mask = _action_to_inotify_mask.get(info.action, IN_CHANGED)
@@ -268,7 +352,16 @@ class INotify(PollMixin):
                                         log.err(e)
                             reactor.callLater(self._pending_delay, _do_callbacks)
                     reactor.callFromThread(_maybe_notify, path)
+        except StoppedException:
+            self._do_stop()
         except Exception, e:
             log.err(e)
-            self._state = STOPPED
+            self._do_stop()
             raise
+
+    def _do_stop(self):
+        hDirectory = self._hDirectory
+        self._callbacks = []
+        self._hDirectory = None
+        CloseHandle(hDirectory)
+        self._stopped = True