Windows support for drop-upload. fixes #1431
authorDaira Hopwood <daira@jacaranda.org>
Wed, 15 Apr 2015 13:25:07 +0000 (14:25 +0100)
committerDaira Hopwood <daira@jacaranda.org>
Wed, 15 Apr 2015 13:25:07 +0000 (14:25 +0100)
Signed-off-by: Daira Hopwood <daira@jacaranda.org>
src/allmydata/frontends/drop_upload.py
src/allmydata/test/test_drop_upload.py
src/allmydata/util/fileutil.py
src/allmydata/windows/inotify.py [new file with mode: 0644]

index 906decfe21649550d15e7388471a94c42f746bb8..ff8c3c7e3ed0afd984c02f1245b54831fae31cd3 100644 (file)
@@ -16,7 +16,8 @@ from allmydata.immutable.upload import FileName
 class DropUploader(service.MultiService):
     name = 'drop-upload'
 
-    def __init__(self, client, upload_dircap, local_dir_utf8, inotify=None, deque_max_len=100):
+    def __init__(self, client, upload_dircap, local_dir_utf8, inotify=None,
+                 deque_max_len=100, pending_delay=1.0):
         service.MultiService.__init__(self)
 
         try:
@@ -40,7 +41,10 @@ class DropUploader(service.MultiService):
         self.is_upload_ready = False
 
         if inotify is None:
-            from twisted.internet import inotify
+            if sys.platform == "win32":
+                from allmydata.windows import inotify
+            else:
+                from twisted.internet import inotify
         self._inotify = inotify
 
         if not self._local_path.exists():
@@ -51,13 +55,15 @@ class DropUploader(service.MultiService):
         # TODO: allow a path rather than a cap URI.
         self._parent = self._client.create_node_from_uri(upload_dircap)
         if not IDirectoryNode.providedBy(self._parent):
-            raise AssertionError("The '[drop_upload] upload.dircap' parameter does not refer to a directory.")
+            raise AssertionError("The URI in 'private/drop_upload_dircap' does not refer to a directory.")
         if self._parent.is_unknown() or self._parent.is_readonly():
-            raise AssertionError("The '[drop_upload] upload.dircap' parameter is not a writecap to a directory.")
+            raise AssertionError("The URI in 'private/drop_upload_dircap' is not a writecap to a directory.")
 
         self._uploaded_callback = lambda ign: None
 
         self._notifier = inotify.INotify()
+        if hasattr(self._notifier, 'set_pending_delay'):
+            self._notifier.set_pending_delay(pending_delay)
 
         # We don't watch for IN_CREATE, because that would cause us to read and upload a
         # possibly-incomplete file before the application has closed it. There should always
@@ -105,12 +111,12 @@ class DropUploader(service.MultiService):
     def _process(self, path, events_mask):
         d = defer.succeed(None)
 
-        # FIXME: if this already exists as a mutable file, we replace the directory entry,
-        # but we should probably modify the file (as the SFTP frontend does).
+        # FIXME (ticket #1712): if this already exists as a mutable file, we replace the
+        # directory entry, but we should probably modify the file (as the SFTP frontend does).
         def _add_file(ign):
             name = path.basename()
             # on Windows the name is already Unicode
-            if not isinstance(name, unicode):
+            if sys.platform != "win32":
                 name = name.decode(get_filesystem_encoding())
 
             u = FileName(path.path, self._convergence)
index 141686e05bff37c1d155a4b4a9f2a5b5944bda45..ad6670681aba5a37795e95c289aa3b022fe2e2eb 100644 (file)
@@ -7,7 +7,7 @@ from twisted.internet import defer
 
 from allmydata.interfaces import IDirectoryNode, NoSuchChildError
 
-from allmydata.util import fake_inotify
+from allmydata.util import fileutil, fake_inotify
 from allmydata.util.encodingutil import get_filesystem_encoding
 from allmydata.util.consumer import download_to_data
 from allmydata.test.no_network import GridTestMixin
@@ -41,7 +41,7 @@ class DropUploadTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin, NonA
             self.upload_dirnode = n
             self.upload_dircap = n.get_uri()
             self.uploader = DropUploader(self.client, self.upload_dircap, self.local_dir.encode('utf-8'),
-                                         inotify=self.inotify)
+                                         inotify=self.inotify, pending_delay=0.2)
             self.uploader.setServiceParent(self.client)
             d = self.uploader.startService()
             self.uploader.upload_ready()
@@ -106,6 +106,7 @@ class DropUploadTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin, NonA
             f.close()
         if temporary and sys.platform == "win32":
             os.unlink(path.path)
+        fileutil.flush_volume(path.path)
         self.notify_close_write(path)
 
         if temporary:
@@ -132,6 +133,8 @@ class MockTest(DropUploadTestMixin, unittest.TestCase):
         self.set_up_grid()
         errors_dir = os.path.join(self.basedir, "errors_dir")
         os.mkdir(errors_dir)
+        not_a_dir = os.path.join(self.basedir, 'NOT_A_DIR')
+        fileutil.write(not_a_dir, "")
 
         client = self.g.clients[0]
         d = client.create_dirnode()
@@ -145,10 +148,8 @@ class MockTest(DropUploadTestMixin, unittest.TestCase):
             self.shouldFail(AssertionError, 'nonexistent local.directory', 'there is no directory',
                             DropUploader, client, upload_dircap, os.path.join(self.basedir, "Laputa"), inotify=fake_inotify)
 
-            fp = filepath.FilePath(self.basedir).child('NOT_A_DIR')
-            fp.touch()
             self.shouldFail(AssertionError, 'non-directory local.directory', 'is not a directory',
-                            DropUploader, client, upload_dircap, fp.path, inotify=fake_inotify)
+                            DropUploader, client, upload_dircap, not_a_dir, inotify=fake_inotify)
 
             self.shouldFail(AssertionError, 'bad upload.dircap', 'does not refer to a directory',
                             DropUploader, client, 'bad', errors_dir, inotify=fake_inotify)
@@ -174,7 +175,7 @@ class RealTest(DropUploadTestMixin, unittest.TestCase):
     def test_drop_upload(self):
         # We should always have runtime.platform.supportsINotify, because we're using
         # Twisted >= 10.1.
-        if not runtime.platform.supportsINotify():
+        if sys.platform != "win32" and not runtime.platform.supportsINotify():
             raise unittest.SkipTest("Drop-upload support can only be tested for-real on an OS that supports inotify or equivalent.")
 
         self.inotify = None  # use the appropriate inotify for the platform
index 74132733169f306f8feb668f565d547f9b6f4df9..b4f6df17096a2265d9ef65a90979544b27180158 100644 (file)
@@ -521,3 +521,48 @@ def get_available_space(whichdir, reserved_space):
     except EnvironmentError:
         log.msg("OS call to get disk statistics failed")
         return 0
+
+
+if sys.platform == "win32":
+    from ctypes import WINFUNCTYPE, windll, WinError
+    from ctypes.wintypes import BOOL, HANDLE, DWORD, LPCWSTR, LPVOID
+
+    # <http://msdn.microsoft.com/en-us/library/aa363858%28v=vs.85%29.aspx>
+    CreateFileW = WINFUNCTYPE(HANDLE, LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE) \
+                      (("CreateFileW", windll.kernel32))
+
+    GENERIC_WRITE        = 0x40000000
+    FILE_SHARE_READ      = 0x00000001
+    FILE_SHARE_WRITE     = 0x00000002
+    OPEN_EXISTING        = 3
+    INVALID_HANDLE_VALUE = 0xFFFFFFFF
+
+    # <http://msdn.microsoft.com/en-us/library/aa364439%28v=vs.85%29.aspx>
+    FlushFileBuffers = WINFUNCTYPE(BOOL, HANDLE)(("FlushFileBuffers", windll.kernel32))
+
+    # <http://msdn.microsoft.com/en-us/library/ms724211%28v=vs.85%29.aspx>
+    CloseHandle = WINFUNCTYPE(BOOL, HANDLE)(("CloseHandle", windll.kernel32))
+
+    # <http://social.msdn.microsoft.com/forums/en-US/netfxbcl/thread/4465cafb-f4ed-434f-89d8-c85ced6ffaa8/>
+    def flush_volume(path):
+        drive = os.path.splitdrive(os.path.realpath(path))[0]
+
+        hVolume = CreateFileW(u"\\\\.\\" + drive,
+                              GENERIC_WRITE,
+                              FILE_SHARE_READ | FILE_SHARE_WRITE,
+                              None,
+                              OPEN_EXISTING,
+                              0,
+                              None
+                             )
+        if hVolume == INVALID_HANDLE_VALUE:
+            raise WinError()
+
+        if FlushFileBuffers(hVolume) == 0:
+            raise WinError()
+
+        CloseHandle(hVolume)
+else:
+    def flush_volume(path):
+        # use sync()?
+        pass
diff --git a/src/allmydata/windows/inotify.py b/src/allmydata/windows/inotify.py
new file mode 100644 (file)
index 0000000..51d80f1
--- /dev/null
@@ -0,0 +1,275 @@
+
+# Windows near-equivalent to twisted.internet.inotify
+# This should only be imported on Windows.
+
+import os, sys
+
+from twisted.internet import reactor
+from twisted.internet.threads import deferToThread
+
+from allmydata.util.fake_inotify import humanReadableMask, \
+    IN_WATCH_MASK, IN_ACCESS, IN_MODIFY, IN_ATTRIB, IN_CLOSE_NOWRITE, IN_CLOSE_WRITE, \
+    IN_OPEN, IN_MOVED_FROM, IN_MOVED_TO, IN_CREATE, IN_DELETE, IN_DELETE_SELF, \
+    IN_MOVE_SELF, IN_UNMOUNT, IN_Q_OVERFLOW, IN_IGNORED, IN_ONLYDIR, IN_DONT_FOLLOW, \
+    IN_MASK_ADD, IN_ISDIR, IN_ONESHOT, IN_CLOSE, IN_MOVED, IN_CHANGED
+[humanReadableMask, \
+    IN_WATCH_MASK, IN_ACCESS, IN_MODIFY, IN_ATTRIB, IN_CLOSE_NOWRITE, IN_CLOSE_WRITE, \
+    IN_OPEN, IN_MOVED_FROM, IN_MOVED_TO, IN_CREATE, IN_DELETE, IN_DELETE_SELF, \
+    IN_MOVE_SELF, IN_UNMOUNT, IN_Q_OVERFLOW, IN_IGNORED, IN_ONLYDIR, IN_DONT_FOLLOW, \
+    IN_MASK_ADD, IN_ISDIR, IN_ONESHOT, IN_CLOSE, IN_MOVED, IN_CHANGED]
+
+from allmydata.util.assertutil import _assert, precondition
+from allmydata.util.encodingutil import quote_output
+from allmydata.util import log, fileutil
+from allmydata.util.pollmixin import PollMixin
+
+from ctypes import WINFUNCTYPE, WinError, windll, POINTER, byref, create_string_buffer, addressof
+from ctypes.wintypes import BOOL, HANDLE, DWORD, LPCWSTR, LPVOID
+
+# <http://msdn.microsoft.com/en-us/library/gg258116%28v=vs.85%29.aspx>
+FILE_LIST_DIRECTORY              = 1
+
+# <http://msdn.microsoft.com/en-us/library/aa363858%28v=vs.85%29.aspx>
+CreateFileW = WINFUNCTYPE(HANDLE, LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE) \
+                  (("CreateFileW", windll.kernel32))
+
+FILE_SHARE_READ                  = 0x00000001
+FILE_SHARE_WRITE                 = 0x00000002
+FILE_SHARE_DELETE                = 0x00000004
+
+OPEN_EXISTING                    = 3
+
+FILE_FLAG_BACKUP_SEMANTICS       = 0x02000000
+
+# <http://msdn.microsoft.com/en-us/library/ms724211%28v=vs.85%29.aspx>
+CloseHandle = WINFUNCTYPE(BOOL, HANDLE)(("CloseHandle", windll.kernel32))
+
+# <http://msdn.microsoft.com/en-us/library/aa365465%28v=vs.85%29.aspx>
+ReadDirectoryChangesW = WINFUNCTYPE(BOOL, HANDLE, LPVOID, DWORD, BOOL, DWORD, POINTER(DWORD), LPVOID, LPVOID) \
+                            (("ReadDirectoryChangesW", windll.kernel32))
+
+FILE_NOTIFY_CHANGE_FILE_NAME     = 0x00000001
+FILE_NOTIFY_CHANGE_DIR_NAME      = 0x00000002
+FILE_NOTIFY_CHANGE_ATTRIBUTES    = 0x00000004
+#FILE_NOTIFY_CHANGE_SIZE         = 0x00000008
+FILE_NOTIFY_CHANGE_LAST_WRITE    = 0x00000010
+FILE_NOTIFY_CHANGE_LAST_ACCESS   = 0x00000020
+#FILE_NOTIFY_CHANGE_CREATION     = 0x00000040
+FILE_NOTIFY_CHANGE_SECURITY      = 0x00000100
+
+# <http://msdn.microsoft.com/en-us/library/aa364391%28v=vs.85%29.aspx>
+FILE_ACTION_ADDED                = 0x00000001
+FILE_ACTION_REMOVED              = 0x00000002
+FILE_ACTION_MODIFIED             = 0x00000003
+FILE_ACTION_RENAMED_OLD_NAME     = 0x00000004
+FILE_ACTION_RENAMED_NEW_NAME     = 0x00000005
+
+_action_to_string = {
+    FILE_ACTION_ADDED            : "FILE_ACTION_ADDED",
+    FILE_ACTION_REMOVED          : "FILE_ACTION_REMOVED",
+    FILE_ACTION_MODIFIED         : "FILE_ACTION_MODIFIED",
+    FILE_ACTION_RENAMED_OLD_NAME : "FILE_ACTION_RENAMED_OLD_NAME",
+    FILE_ACTION_RENAMED_NEW_NAME : "FILE_ACTION_RENAMED_NEW_NAME",
+}
+
+_action_to_inotify_mask = {
+    FILE_ACTION_ADDED            : IN_CREATE,
+    FILE_ACTION_REMOVED          : IN_DELETE,
+    FILE_ACTION_MODIFIED         : IN_CHANGED,
+    FILE_ACTION_RENAMED_OLD_NAME : IN_MOVED_FROM,
+    FILE_ACTION_RENAMED_NEW_NAME : IN_MOVED_TO,
+}
+
+INVALID_HANDLE_VALUE             = 0xFFFFFFFF
+
+
+class Event(object):
+    """
+    * action:   a FILE_ACTION_* constant (not a bit mask)
+    * filename: a Unicode string, giving the name relative to the watched directory
+    """
+    def __init__(self, action, filename):
+        self.action = action
+        self.filename = filename
+
+    def __repr__(self):
+        return "Event(%r, %r)" % (_action_to_string.get(self.action, self.action), self.filename)
+
+
+class FileNotifyInformation(object):
+    """
+    I represent a buffer containing FILE_NOTIFY_INFORMATION structures, and can
+    iterate over those structures, decoding them into Event objects.
+    """
+
+    def __init__(self, size=1024):
+        self.size = size
+        self.buffer = create_string_buffer(size)
+        address = addressof(self.buffer)
+        _assert(address & 3 == 0, "address 0x%X returned by create_string_buffer is not DWORD-aligned" % (address,))
+        self.data = None
+
+    def read_changes(self, hDirectory, recursive, filter):
+        bytes_returned = DWORD(0)
+        r = ReadDirectoryChangesW(hDirectory,
+                                  self.buffer,
+                                  self.size,
+                                  recursive,
+                                  filter,
+                                  byref(bytes_returned),
+                                  None,  # NULL -> no overlapped I/O
+                                  None   # NULL -> no completion routine
+                                 )
+        if r == 0:
+            raise WinError()
+        self.data = self.buffer.raw[:bytes_returned.value]
+
+    def __iter__(self):
+        # Iterator implemented as generator: <http://docs.python.org/library/stdtypes.html#generator-types>
+        pos = 0
+        while True:
+            bytes = self._read_dword(pos+8)
+            s = Event(self._read_dword(pos+4),
+                      self.data[pos+12 : pos+12+bytes].decode('utf-16-le'))
+
+            next_entry_offset = self._read_dword(pos)
+            yield s
+            if next_entry_offset == 0:
+                break
+            pos = pos + next_entry_offset
+
+    def _read_dword(self, i):
+        # little-endian
+        return ( ord(self.data[i])          |
+                (ord(self.data[i+1]) <<  8) |
+                (ord(self.data[i+2]) << 16) |
+                (ord(self.data[i+3]) << 24))
+
+
+def _open_directory(path_u):
+    hDirectory = CreateFileW(path_u,
+                             FILE_LIST_DIRECTORY,         # access rights
+                             FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
+                                                          # don't prevent other processes from accessing
+                             None,                        # no security descriptor
+                             OPEN_EXISTING,               # directory must already exist
+                             FILE_FLAG_BACKUP_SEMANTICS,  # necessary to open a directory
+                             None                         # no template file
+                            )
+    if hDirectory == INVALID_HANDLE_VALUE:
+        e = WinError()
+        raise OSError("Opening directory %s gave Windows error %r: %s" % (quote_output(path_u), e.args[0], e.args[1]))
+    return hDirectory
+
+
+def simple_test():
+    path_u = u"test"
+    filter = FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME | FILE_NOTIFY_CHANGE_LAST_WRITE
+    recursive = False
+
+    hDirectory = _open_directory(path_u)
+    fni = FileNotifyInformation()
+    print "Waiting..."
+    while True:
+        fni.read_changes(hDirectory, recursive, filter)
+        print repr(fni.data)
+        for info in fni:
+            print info
+
+
+NOT_STARTED = "NOT_STARTED"
+STARTED     = "STARTED"
+STOPPING    = "STOPPING"
+STOPPED     = "STOPPED"
+
+class INotify(PollMixin):
+    def __init__(self):
+        self._state = NOT_STARTED
+        self._filter = None
+        self._callbacks = None
+        self._hDirectory = None
+        self._path = None
+        self._pending = set()
+        self._pending_delay = 1.0
+
+    def set_pending_delay(self, delay):
+        self._pending_delay = delay
+
+    def startReading(self):
+        deferToThread(self._thread)
+        return self.poll(lambda: self._state != NOT_STARTED)
+
+    def stopReading(self):
+        # FIXME race conditions
+        if self._state != STOPPED:
+            self._state = STOPPING
+
+    def wait_until_stopped(self):
+        fileutil.write(os.path.join(self._path.path, u".ignore-me"), "")
+        return self.poll(lambda: self._state == STOPPED)
+
+    def watch(self, path, mask=IN_WATCH_MASK, autoAdd=False, callbacks=None, recursive=False):
+        precondition(self._state == NOT_STARTED, "watch() can only be called before startReading()", state=self._state)
+        precondition(self._filter is None, "only one watch is supported")
+        precondition(isinstance(autoAdd, bool), autoAdd=autoAdd)
+        precondition(isinstance(recursive, bool), recursive=recursive)
+        precondition(autoAdd == recursive, "need autoAdd and recursive to be the same", autoAdd=autoAdd, recursive=recursive)
+
+        self._path = path
+        path_u = path.path
+        if not isinstance(path_u, unicode):
+            path_u = path_u.decode(sys.getfilesystemencoding())
+            _assert(isinstance(path_u, unicode), path_u=path_u)
+
+        self._filter = FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME | FILE_NOTIFY_CHANGE_LAST_WRITE
+
+        if mask & (IN_ACCESS | IN_CLOSE_NOWRITE | IN_OPEN):
+            self._filter = self._filter | FILE_NOTIFY_CHANGE_LAST_ACCESS
+        if mask & IN_ATTRIB:
+            self._filter = self._filter | FILE_NOTIFY_CHANGE_ATTRIBUTES | FILE_NOTIFY_CHANGE_SECURITY
+
+        self._recursive = recursive
+        self._callbacks = callbacks or []
+        self._hDirectory = _open_directory(path_u)
+
+    def _thread(self):
+        try:
+            _assert(self._filter is not None, "no watch set")
+
+            # To call Twisted or Tahoe APIs, use reactor.callFromThread as described in
+            # <http://twistedmatrix.com/documents/current/core/howto/threading.html>.
+
+            fni = FileNotifyInformation()
+
+            while True:
+                self._state = STARTED
+                fni.read_changes(self._hDirectory, self._recursive, self._filter)
+                for info in fni:
+                    if self._state == STOPPING:
+                        hDirectory = self._hDirectory
+                        self._callbacks = None
+                        self._hDirectory = None
+                        CloseHandle(hDirectory)
+                        self._state = STOPPED
+                        return
+
+                    path = self._path.preauthChild(info.filename)  # FilePath with Unicode path
+                    mask = _action_to_inotify_mask.get(info.action, IN_CHANGED)
+
+                    def _maybe_notify(path, mask):
+                        event = (path, mask)
+                        if event not in self._pending:
+                            self._pending.add(event)
+                            def _do_callbacks():
+                                self._pending.remove(event)
+                                for cb in self._callbacks:
+                                    try:
+                                        cb(None, path, mask)
+                                    except Exception, e:
+                                        log.err(e)
+                            reactor.callLater(self._pending_delay, _do_callbacks)
+                    reactor.callFromThread(_maybe_notify, path, mask)
+        except Exception, e:
+            log.err(e)
+            self._state = STOPPED
+            raise