From: Daira Hopwood Date: Sat, 12 Apr 2014 21:49:01 +0000 (+0100) Subject: Windows support for drop-upload. fixes #1431 X-Git-Url: https://git.rkrishnan.org/index.php?a=commitdiff_plain;h=42c0f8135cc4920b10dece2a9695e52fed433bdd;p=tahoe-lafs%2Ftahoe-lafs.git Windows support for drop-upload. fixes #1431 Signed-off-by: Daira Hopwood --- diff --git a/src/allmydata/frontends/drop_upload.py b/src/allmydata/frontends/drop_upload.py index 6f25625e..6298427e 100644 --- a/src/allmydata/frontends/drop_upload.py +++ b/src/allmydata/frontends/drop_upload.py @@ -15,7 +15,7 @@ from allmydata.immutable.upload import FileName class DropUploader(service.MultiService): name = 'drop-upload' - def __init__(self, client, upload_dircap, local_dir_utf8, inotify=None): + def __init__(self, client, upload_dircap, local_dir_utf8, inotify=None, pending_delay=1.0): service.MultiService.__init__(self) try: @@ -35,7 +35,10 @@ class DropUploader(service.MultiService): self._local_path = FilePath(local_dir) if inotify is None: - from twisted.internet import inotify + if sys.platform == "win32": + from allmydata.windows import inotify + else: + from twisted.internet import inotify self._inotify = inotify if not self._local_path.exists(): @@ -46,13 +49,15 @@ class DropUploader(service.MultiService): # TODO: allow a path rather than a cap URI. self._parent = self._client.create_node_from_uri(upload_dircap) if not IDirectoryNode.providedBy(self._parent): - raise AssertionError("The '[drop_upload] upload.dircap' parameter does not refer to a directory.") + raise AssertionError("The URI in 'private/drop_upload_dircap' does not refer to a directory.") if self._parent.is_unknown() or self._parent.is_readonly(): - raise AssertionError("The '[drop_upload] upload.dircap' parameter is not a writecap to a directory.") + raise AssertionError("The URI in 'private/drop_upload_dircap' is not a writecap to a directory.") self._uploaded_callback = lambda ign: None self._notifier = inotify.INotify() + if hasattr(self._notifier, 'set_pending_delay'): + self._notifier.set_pending_delay(pending_delay) # We don't watch for IN_CREATE, because that would cause us to read and upload a # possibly-incomplete file before the application has closed it. There should always @@ -76,12 +81,12 @@ class DropUploader(service.MultiService): def _process(self, opaque, path, events_mask): d = defer.succeed(None) - # FIXME: if this already exists as a mutable file, we replace the directory entry, - # but we should probably modify the file (as the SFTP frontend does). + # FIXME (ticket #1712): if this already exists as a mutable file, we replace the + # directory entry, but we should probably modify the file (as the SFTP frontend does). def _add_file(ign): name = path.basename() # on Windows the name is already Unicode - if not isinstance(name, unicode): + if sys.platform != "win32": name = name.decode(get_filesystem_encoding()) u = FileName(path.path, self._convergence) diff --git a/src/allmydata/test/test_drop_upload.py b/src/allmydata/test/test_drop_upload.py index 30ff8811..3d0de000 100644 --- a/src/allmydata/test/test_drop_upload.py +++ b/src/allmydata/test/test_drop_upload.py @@ -7,7 +7,7 @@ from twisted.internet import defer from allmydata.interfaces import IDirectoryNode, NoSuchChildError -from allmydata.util import fake_inotify +from allmydata.util import fileutil, fake_inotify from allmydata.util.encodingutil import get_filesystem_encoding from allmydata.util.consumer import download_to_data from allmydata.test.no_network import GridTestMixin @@ -41,7 +41,7 @@ class DropUploadTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin, NonA self.upload_dirnode = n self.upload_dircap = n.get_uri() self.uploader = DropUploader(self.client, self.upload_dircap, self.local_dir.encode('utf-8'), - inotify=self.inotify) + inotify=self.inotify, pending_delay=0.2) return self.uploader.startService() d.addCallback(_made_upload_dir) @@ -101,6 +101,7 @@ class DropUploadTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin, NonA f.close() if temporary and sys.platform == "win32": os.unlink(path.path) + fileutil.flush_volume(path.path) self.notify_close_write(path) if temporary: @@ -127,6 +128,8 @@ class MockTest(DropUploadTestMixin, unittest.TestCase): self.set_up_grid() errors_dir = os.path.join(self.basedir, "errors_dir") os.mkdir(errors_dir) + not_a_dir = os.path.join(self.basedir, 'NOT_A_DIR') + fileutil.write(not_a_dir, "") client = self.g.clients[0] d = client.create_dirnode() @@ -140,10 +143,8 @@ class MockTest(DropUploadTestMixin, unittest.TestCase): self.shouldFail(AssertionError, 'nonexistent local.directory', 'there is no directory', DropUploader, client, upload_dircap, os.path.join(self.basedir, "Laputa"), inotify=fake_inotify) - fp = filepath.FilePath(self.basedir).child('NOT_A_DIR') - fp.touch() self.shouldFail(AssertionError, 'non-directory local.directory', 'is not a directory', - DropUploader, client, upload_dircap, fp.path, inotify=fake_inotify) + DropUploader, client, upload_dircap, not_a_dir, inotify=fake_inotify) self.shouldFail(AssertionError, 'bad upload.dircap', 'does not refer to a directory', DropUploader, client, 'bad', errors_dir, inotify=fake_inotify) @@ -169,7 +170,7 @@ class RealTest(DropUploadTestMixin, unittest.TestCase): def test_drop_upload(self): # We should always have runtime.platform.supportsINotify, because we're using # Twisted >= 10.1. - if not runtime.platform.supportsINotify(): + if sys.platform != "win32" and not runtime.platform.supportsINotify(): raise unittest.SkipTest("Drop-upload support can only be tested for-real on an OS that supports inotify or equivalent.") self.inotify = None # use the appropriate inotify for the platform diff --git a/src/allmydata/util/fileutil.py b/src/allmydata/util/fileutil.py index 8eb8e9fb..b248a40b 100644 --- a/src/allmydata/util/fileutil.py +++ b/src/allmydata/util/fileutil.py @@ -426,3 +426,48 @@ def get_available_space(whichdir, reserved_space): except EnvironmentError: log.msg("OS call to get disk statistics failed") return 0 + + +if sys.platform == "win32": + from ctypes import WINFUNCTYPE, windll, WinError + from ctypes.wintypes import BOOL, HANDLE, DWORD, LPCWSTR, LPVOID + + # + CreateFileW = WINFUNCTYPE(HANDLE, LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE) \ + (("CreateFileW", windll.kernel32)) + + GENERIC_WRITE = 0x40000000 + FILE_SHARE_READ = 0x00000001 + FILE_SHARE_WRITE = 0x00000002 + OPEN_EXISTING = 3 + INVALID_HANDLE_VALUE = 0xFFFFFFFF + + # + FlushFileBuffers = WINFUNCTYPE(BOOL, HANDLE)(("FlushFileBuffers", windll.kernel32)) + + # + CloseHandle = WINFUNCTYPE(BOOL, HANDLE)(("CloseHandle", windll.kernel32)) + + # + def flush_volume(path): + drive = os.path.splitdrive(os.path.realpath(path))[0] + + hVolume = CreateFileW(u"\\\\.\\" + drive, + GENERIC_WRITE, + FILE_SHARE_READ | FILE_SHARE_WRITE, + None, + OPEN_EXISTING, + 0, + None + ) + if hVolume == INVALID_HANDLE_VALUE: + raise WinError() + + if FlushFileBuffers(hVolume) == 0: + raise WinError() + + CloseHandle(hVolume) +else: + def flush_volume(path): + # use sync()? + pass diff --git a/src/allmydata/windows/inotify.py b/src/allmydata/windows/inotify.py new file mode 100644 index 00000000..4f072cac --- /dev/null +++ b/src/allmydata/windows/inotify.py @@ -0,0 +1,269 @@ + +# Windows near-equivalent to twisted.internet.inotify +# This should only be imported on Windows. + +import os, sys + +from twisted.internet import reactor +from twisted.internet.threads import deferToThread + +from allmydata.util.fake_inotify import humanReadableMask, \ + IN_WATCH_MASK, IN_ACCESS, IN_MODIFY, IN_ATTRIB, IN_CLOSE_NOWRITE, IN_CLOSE_WRITE, \ + IN_OPEN, IN_MOVED_FROM, IN_MOVED_TO, IN_CREATE, IN_DELETE, IN_DELETE_SELF, \ + IN_MOVE_SELF, IN_UNMOUNT, IN_Q_OVERFLOW, IN_IGNORED, IN_ONLYDIR, IN_DONT_FOLLOW, \ + IN_MASK_ADD, IN_ISDIR, IN_ONESHOT, IN_CLOSE, IN_MOVED, IN_CHANGED +[humanReadableMask, \ + IN_WATCH_MASK, IN_ACCESS, IN_MODIFY, IN_ATTRIB, IN_CLOSE_NOWRITE, IN_CLOSE_WRITE, \ + IN_OPEN, IN_MOVED_FROM, IN_MOVED_TO, IN_CREATE, IN_DELETE, IN_DELETE_SELF, \ + IN_MOVE_SELF, IN_UNMOUNT, IN_Q_OVERFLOW, IN_IGNORED, IN_ONLYDIR, IN_DONT_FOLLOW, \ + IN_MASK_ADD, IN_ISDIR, IN_ONESHOT, IN_CLOSE, IN_MOVED, IN_CHANGED] + +from allmydata.util.encodingutil import quote_output +from allmydata.util import log, fileutil +from allmydata.util.pollmixin import PollMixin + +from ctypes import WINFUNCTYPE, WinError, windll, POINTER, byref, create_string_buffer, addressof +from ctypes.wintypes import BOOL, HANDLE, DWORD, LPCWSTR, LPVOID + +# +FILE_LIST_DIRECTORY = 1 + +# +CreateFileW = WINFUNCTYPE(HANDLE, LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE) \ + (("CreateFileW", windll.kernel32)) + +FILE_SHARE_READ = 0x00000001 +FILE_SHARE_WRITE = 0x00000002 +FILE_SHARE_DELETE = 0x00000004 + +OPEN_EXISTING = 3 + +FILE_FLAG_BACKUP_SEMANTICS = 0x02000000 + +# +CloseHandle = WINFUNCTYPE(BOOL, HANDLE)(("CloseHandle", windll.kernel32)) + +# +ReadDirectoryChangesW = WINFUNCTYPE(BOOL, HANDLE, LPVOID, DWORD, BOOL, DWORD, POINTER(DWORD), LPVOID, LPVOID) \ + (("ReadDirectoryChangesW", windll.kernel32)) + +FILE_NOTIFY_CHANGE_FILE_NAME = 0x00000001 +FILE_NOTIFY_CHANGE_DIR_NAME = 0x00000002 +FILE_NOTIFY_CHANGE_ATTRIBUTES = 0x00000004 +#FILE_NOTIFY_CHANGE_SIZE = 0x00000008 +FILE_NOTIFY_CHANGE_LAST_WRITE = 0x00000010 +FILE_NOTIFY_CHANGE_LAST_ACCESS = 0x00000020 +#FILE_NOTIFY_CHANGE_CREATION = 0x00000040 +FILE_NOTIFY_CHANGE_SECURITY = 0x00000100 + +# +FILE_ACTION_ADDED = 0x00000001 +FILE_ACTION_REMOVED = 0x00000002 +FILE_ACTION_MODIFIED = 0x00000003 +FILE_ACTION_RENAMED_OLD_NAME = 0x00000004 +FILE_ACTION_RENAMED_NEW_NAME = 0x00000005 + +_action_to_string = { + FILE_ACTION_ADDED : "FILE_ACTION_ADDED", + FILE_ACTION_REMOVED : "FILE_ACTION_REMOVED", + FILE_ACTION_MODIFIED : "FILE_ACTION_MODIFIED", + FILE_ACTION_RENAMED_OLD_NAME : "FILE_ACTION_RENAMED_OLD_NAME", + FILE_ACTION_RENAMED_NEW_NAME : "FILE_ACTION_RENAMED_NEW_NAME", +} + +_action_to_inotify_mask = { + FILE_ACTION_ADDED : IN_CREATE, + FILE_ACTION_REMOVED : IN_DELETE, + FILE_ACTION_MODIFIED : IN_CHANGED, + FILE_ACTION_RENAMED_OLD_NAME : IN_MOVED_FROM, + FILE_ACTION_RENAMED_NEW_NAME : IN_MOVED_TO, +} + +INVALID_HANDLE_VALUE = 0xFFFFFFFF + + +class Event(object): + """ + * action: a FILE_ACTION_* constant (not a bit mask) + * filename: a Unicode string, giving the name relative to the watched directory + """ + def __init__(self, action, filename): + self.action = action + self.filename = filename + + def __repr__(self): + return "Event(%r, %r)" % (_action_to_string.get(self.action, self.action), self.filename) + + +class FileNotifyInformation(object): + """ + I represent a buffer containing FILE_NOTIFY_INFORMATION structures, and can + iterate over those structures, decoding them into Event objects. + """ + + def __init__(self, size=1024): + self.size = size + self.buffer = create_string_buffer(size) + address = addressof(self.buffer) + assert address & 3 == 0, "address 0x%X returned by create_string_buffer is not DWORD-aligned" % (address,) + self.data = None + + def read_changes(self, hDirectory, recursive, filter): + bytes_returned = DWORD(0) + r = ReadDirectoryChangesW(hDirectory, + self.buffer, + self.size, + recursive, + filter, + byref(bytes_returned), + None, # NULL -> no overlapped I/O + None # NULL -> no completion routine + ) + if r == 0: + raise WinError() + self.data = self.buffer.raw[:bytes_returned.value] + + def __iter__(self): + # Iterator implemented as generator: + pos = 0 + while True: + bytes = self._read_dword(pos+8) + s = Event(self._read_dword(pos+4), + self.data[pos+12 : pos+12+bytes].decode('utf-16-le')) + + next_entry_offset = self._read_dword(pos) + yield s + if next_entry_offset == 0: + break + pos = pos + next_entry_offset + + def _read_dword(self, i): + # little-endian + return ( ord(self.data[i]) | + (ord(self.data[i+1]) << 8) | + (ord(self.data[i+2]) << 16) | + (ord(self.data[i+3]) << 24)) + + +def _open_directory(path_u): + hDirectory = CreateFileW(path_u, + FILE_LIST_DIRECTORY, # access rights + FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE, + # don't prevent other processes from accessing + None, # no security descriptor + OPEN_EXISTING, # directory must already exist + FILE_FLAG_BACKUP_SEMANTICS, # necessary to open a directory + None # no template file + ) + if hDirectory == INVALID_HANDLE_VALUE: + e = WinError() + raise OSError("Opening directory %s gave Windows error %r: %s" % (quote_output(path_u), e.args[0], e.args[1])) + return hDirectory + + +def simple_test(): + path_u = u"test" + filter = FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME | FILE_NOTIFY_CHANGE_LAST_WRITE + recursive = False + + hDirectory = _open_directory(path_u) + fni = FileNotifyInformation() + print "Waiting..." + while True: + fni.read_changes(hDirectory, recursive, filter) + print repr(fni.data) + for info in fni: + print info + + +class INotify(PollMixin): + def __init__(self): + self._stop = None + self._filter = None + self._callbacks = None + self._hDirectory = None + self._path = None + self._pending = set() + self._pending_delay = 1.0 + + def set_pending_delay(self, delay): + self._pending_delay = delay + + def startReading(self): + deferToThread(self._thread) + return self.poll(lambda: self._stop == False) + + def stopReading(self): + if self._stop is not None: + self._stop = True + + def wait_until_stopped(self): + fileutil.write(os.path.join(self._path.path, u".ignore-me"), "") + return self.poll(lambda: self._stop is None) + + def watch(self, path, mask=IN_WATCH_MASK, autoAdd=False, callbacks=None, recursive=False): + assert self._stop is None, "watch() can only be called before startReading()" + assert self._filter is None, "only one watch is supported" + assert isinstance(autoAdd, bool), autoAdd + assert isinstance(recursive, bool), recursive + assert autoAdd == recursive, ("autoAdd = %r, recursive = %r, but we need them to be the same" + % (autoAdd, recursive)) + + self._path = path + path_u = path.path + if not isinstance(path_u, unicode): + path_u = path_u.decode(sys.getfilesystemencoding()) + assert isinstance(path_u, unicode), path_u + + self._filter = FILE_NOTIFY_CHANGE_FILE_NAME | FILE_NOTIFY_CHANGE_DIR_NAME | FILE_NOTIFY_CHANGE_LAST_WRITE + + if mask & (IN_ACCESS | IN_CLOSE_NOWRITE | IN_OPEN): + self._filter = self._filter | FILE_NOTIFY_CHANGE_LAST_ACCESS + if mask & IN_ATTRIB: + self._filter = self._filter | FILE_NOTIFY_CHANGE_ATTRIBUTES | FILE_NOTIFY_CHANGE_SECURITY + + self._recursive = recursive + self._callbacks = callbacks or [] + self._hDirectory = _open_directory(path_u) + + def _thread(self): + try: + assert self._filter is not None, "no watch set" + + # To call Twisted or Tahoe APIs, use reactor.callFromThread as described in + # . + + fni = FileNotifyInformation() + + while True: + self._stop = False + fni.read_changes(self._hDirectory, self._recursive, self._filter) + for info in fni: + if self._stop: + hDirectory = self._hDirectory + self._callbacks = None + self._hDirectory = None + CloseHandle(hDirectory) + self._stop = None + return + + path = self._path.preauthChild(info.filename) # FilePath with Unicode path + mask = _action_to_inotify_mask.get(info.action, IN_CHANGED) + + def _maybe_notify(path, mask): + event = (path, mask) + if event not in self._pending: + self._pending.add(event) + def _do_callbacks(): + self._pending.remove(event) + for cb in self._callbacks: + try: + cb(None, path, mask) + except Exception, e: + log.msg(e) + reactor.callLater(self._pending_delay, _do_callbacks) + reactor.callFromThread(_maybe_notify, path, mask) + except Exception, e: + log.msg(e) + self.stop = False # pretend we started + raise