From 40bcefa866f48f94bb01aeb5507c186ae8db6dbb Mon Sep 17 00:00:00 2001 From: Daira Hopwood Date: Wed, 29 Apr 2015 16:08:46 +0100 Subject: [PATCH] WIP Signed-off-by: Daira Hopwood --- src/allmydata/frontends/drop_upload.py | 5 +- src/allmydata/windows/inotify.py | 211 ++++++++++++++++++------- 2 files changed, 155 insertions(+), 61 deletions(-) diff --git a/src/allmydata/frontends/drop_upload.py b/src/allmydata/frontends/drop_upload.py index ff8c3c7e..f43462bc 100644 --- a/src/allmydata/frontends/drop_upload.py +++ b/src/allmydata/frontends/drop_upload.py @@ -74,7 +74,10 @@ class DropUploader(service.MultiService): def startService(self): service.MultiService.startService(self) - d = self._notifier.startReading() + + # startReading can be asynchronous [Windows] or synchronous [Twisted's INotify]. + d = defer.succeed(None) + d.addCallback(lambda ign: self._notifier.startReading()) self._stats_provider.count('drop_upload.dirs_monitored', 1) return d diff --git a/src/allmydata/windows/inotify.py b/src/allmydata/windows/inotify.py index fc2e1d48..4255b6fd 100644 --- a/src/allmydata/windows/inotify.py +++ b/src/allmydata/windows/inotify.py @@ -23,13 +23,14 @@ from allmydata.util.encodingutil import quote_output from allmydata.util import log, fileutil from allmydata.util.pollmixin import PollMixin -from ctypes import WINFUNCTYPE, WinError, windll, POINTER, byref, create_string_buffer, addressof +from ctypes import WINFUNCTYPE, WinError, windll, POINTER, byref, \ + create_string_buffer, addressof, Structure from ctypes.wintypes import BOOL, HANDLE, DWORD, LPCWSTR, LPVOID -# +# FILE_LIST_DIRECTORY = 1 -# +# CreateFileW = WINFUNCTYPE(HANDLE, LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE) \ (("CreateFileW", windll.kernel32)) @@ -40,12 +41,24 @@ FILE_SHARE_DELETE = 0x00000004 OPEN_EXISTING = 3 FILE_FLAG_BACKUP_SEMANTICS = 0x02000000 +FILE_FLAG_OVERLAPPED = 0x40000000 -# +# CloseHandle = WINFUNCTYPE(BOOL, HANDLE)(("CloseHandle", windll.kernel32)) -# -ReadDirectoryChangesW = WINFUNCTYPE(BOOL, HANDLE, LPVOID, DWORD, BOOL, DWORD, POINTER(DWORD), LPVOID, LPVOID) \ +# +class OVERLAPPED(Structure): + _fields_ = [('Internal', LPVOID), + ('InternalHigh', LPVOID), + ('Offset', DWORD), + ('OffsetHigh', DWORD), + ('Pointer', LPVOID), + ('hEvent', HANDLE), + ] + +# +ReadDirectoryChangesW = WINFUNCTYPE(BOOL, HANDLE, LPVOID, DWORD, BOOL, DWORD, + POINTER(DWORD), POINTER(OVERLAPPED), LPVOID) \ (("ReadDirectoryChangesW", windll.kernel32)) FILE_NOTIFY_CHANGE_FILE_NAME = 0x00000001 @@ -57,7 +70,7 @@ FILE_NOTIFY_CHANGE_LAST_ACCESS = 0x00000020 #FILE_NOTIFY_CHANGE_CREATION = 0x00000040 FILE_NOTIFY_CHANGE_SECURITY = 0x00000100 -# +# FILE_ACTION_ADDED = 0x00000001 FILE_ACTION_REMOVED = 0x00000002 FILE_ACTION_MODIFIED = 0x00000003 @@ -82,8 +95,45 @@ _action_to_inotify_mask = { INVALID_HANDLE_VALUE = 0xFFFFFFFF +# +WaitForMultipleObjects = WINFUNCTYPE(DWORD, DWORD, POINTER(HANDLE), BOOL, DWORD) \ + (("WaitForMultipleObjects", windll.kernel32)) -class Event(object): +INFINITE = 0xFFFFFFFF + +WAIT_ABANDONED = 0x00000080 +WAIT_IO_COMPLETION = 0x000000C0 +WAIT_OBJECT_0 = 0x00000000 +WAIT_TIMEOUT = 0x00000102 +WAIT_FAILED = 0xFFFFFFFF + +# +CreateEventW = WINFUNCTYPE(HANDLE, LPVOID, BOOL, BOOL, LPCWSTR) \ + (("CreateEventW", windll.kernel32)) + +# +SetEvent = WINFUNCTYPE(BOOL, HANDLE)(("SetEvent", windll.kernel32)) + +FALSE = 0 +TRUE = 1 + +def _create_event(auto_reset): + # no security descriptor, auto_reset, unsignalled, anonymous + hEvent = CreateEventW(None, auto_reset, FALSE, None) + if hEvent is None: + raise WinError() + +def _signal_event(hEvent): + if SetEvent(hEvent) == 0: + raise WinError() + + +class StoppedException(Exception): + """The notifier has been stopped.""" + pass + + +class Notification(object): """ * action: a FILE_ACTION_* constant (not a bit mask) * filename: a Unicode string, giving the name relative to the watched directory @@ -93,58 +143,95 @@ class Event(object): self.filename = filename def __repr__(self): - return "Event(%r, %r)" % (_action_to_string.get(self.action, self.action), self.filename) + return "Notification(%r, %r)" % (_action_to_string.get(self.action, self.action), self.filename) class FileNotifyInformation(object): """ I represent a buffer containing FILE_NOTIFY_INFORMATION structures, and can - iterate over those structures, decoding them into Event objects. + iterate over those structures, decoding them into Notification objects. """ def __init__(self, size=1024): - self.size = size - self.buffer = create_string_buffer(size) - address = addressof(self.buffer) + self._size = size + self._buffer = create_string_buffer(size) + address = addressof(self._buffer) _assert(address & 3 == 0, "address 0x%X returned by create_string_buffer is not DWORD-aligned" % (address,)) - self.data = None - def read_changes(self, hDirectory, recursive, filter): + self._hStopped = _create_event(auto_reset=FALSE) + self._hNotified = _create_event(auto_reset=TRUE) + self._events = (HANDLE*2)(self._hStopped, self._hNotified) + + self._overlapped = OVERLAPPED() + self._overlapped.Internal = None + self._overlapped.InternalHigh = None + self._overlapped.Offset = 0 + self._overlapped.OffsetHigh = 0 + self._overlapped.Pointer = None + self._overlapped.hEvent = self._hNotified + + def __del__(self): + if hasattr(self, '_hStopped'): + CloseHandle(self._hStopped) + if hasattr(self, '_hNotified'): + CloseHandle(self._hNotified) + + def stop(self): + _signal_event(self._hStopped) + + def read_notifications(self, hDirectory, recursive, filter): + """This does not block.""" + bytes_returned = DWORD(0) print "here" r = ReadDirectoryChangesW(hDirectory, - self.buffer, - self.size, + self._buffer, + self._size, recursive, filter, byref(bytes_returned), - None, # NULL -> no overlapped I/O + self._overlapped, None # NULL -> no completion routine ) if r == 0: raise WinError() - self.data = self.buffer.raw[:bytes_returned.value] - def __iter__(self): - # Iterator implemented as generator: + def get_notifications(self): + """This blocks and then iterates over the notifications.""" + + r = WaitForMultipleObjects(2, self._events, + FALSE, # wait for any, not all + INFINITE) + if r == WAIT_FAILED: + raise WinError() + if r == WAIT_OBJECT_0: # hStopped + raise StoppedException() + if r != WAIT_OBJECT_0+1: # hNotified + raise OSError("unexpected return from WaitForMultipleObjects: %d" % (r,)) + + data = self._buffer.raw[:bytes_returned.value] + print data + pos = 0 while True: - bytes = self._read_dword(pos+8) - s = Event(self._read_dword(pos+4), - self.data[pos+12 : pos+12+bytes].decode('utf-16-le')) + bytes = _read_dword(data, pos+8) + s = Notification(_read_dword(data, pos+4), + data[pos+12 : pos+12+bytes].decode('utf-16-le')) - next_entry_offset = self._read_dword(pos) + next_entry_offset = _read_dword(data, pos) + print s yield s if next_entry_offset == 0: break pos = pos + next_entry_offset - def _read_dword(self, i): - # little-endian - return ( ord(self.data[i]) | - (ord(self.data[i+1]) << 8) | - (ord(self.data[i+2]) << 16) | - (ord(self.data[i+3]) << 24)) + +def _read_dword(data, i): + # little-endian + return ( ord(data[i]) | + (ord(data[i+1]) << 8) | + (ord(data[i+2]) << 16) | + (ord(data[i+3]) << 24)) def _open_directory(path_u): @@ -172,20 +259,17 @@ def simple_test(): fni = FileNotifyInformation() print "Waiting..." while True: - fni.read_changes(hDirectory, recursive, filter) - print repr(fni.data) - for info in fni: + fni.read_notifications(hDirectory, recursive, filter) + for info in fni.get_notifications(): print info -NOT_STARTED = "NOT_STARTED" -STARTED = "STARTED" -STOPPING = "STOPPING" -STOPPED = "STOPPED" - class INotify(PollMixin): def __init__(self): - self._state = NOT_STARTED + self._fni = FileNotifyInformation() + self._started = False + self._stop = False + self._stopped = False self._filter = None self._callbacks = None self._hDirectory = None @@ -197,20 +281,21 @@ class INotify(PollMixin): self._pending_delay = delay def startReading(self): + # Twisted's version of this is synchronous. deferToThread(self._thread) - return self.poll(lambda: self._state != NOT_STARTED) + return self.poll(lambda: self._started) def stopReading(self): - # FIXME race conditions - if self._state != STOPPED: - self._state = STOPPING + self._stop = True + self._fni.stop() def wait_until_stopped(self): - fileutil.write(os.path.join(self._path.path, u".ignore-me"), "") - return self.poll(lambda: self._state == STOPPED) + if not self._stop: + self.stopReading() + return self.poll(lambda: self._stopped) def watch(self, path, mask=IN_WATCH_MASK, autoAdd=False, callbacks=None, recursive=False): - precondition(self._state == NOT_STARTED, "watch() can only be called before startReading()", state=self._state) + precondition(not self._started, "watch() can only be called before startReading()") precondition(self._filter is None, "only one watch is supported") precondition(isinstance(autoAdd, bool), autoAdd=autoAdd) precondition(isinstance(recursive, bool), recursive=recursive) @@ -240,20 +325,17 @@ class INotify(PollMixin): # To call Twisted or Tahoe APIs, use reactor.callFromThread as described in # . - fni = FileNotifyInformation() - while True: - self._state = STARTED - fni.read_changes(self._hDirectory, self._recursive, self._filter) - for info in fni: + # We must set _started to True *after* calling read_notifications, so that + # the caller of startReading() can tell when we've actually started reading. + + self._fni.read_notifications(self._hDirectory, self._recursive, self._filter) + self._started = True + + for info in self._fni.get_notifications(): print info - if self._state == STOPPING: - hDirectory = self._hDirectory - self._callbacks = None - self._hDirectory = None - CloseHandle(hDirectory) - self._state = STOPPED - return + if self._stop: + raise StoppedException() path = self._path.preauthChild(info.filename) # FilePath with Unicode path #mask = _action_to_inotify_mask.get(info.action, IN_CHANGED) @@ -270,7 +352,16 @@ class INotify(PollMixin): log.err(e) reactor.callLater(self._pending_delay, _do_callbacks) reactor.callFromThread(_maybe_notify, path) + except StoppedException: + self._do_stop() except Exception, e: log.err(e) - self._state = STOPPED + self._do_stop() raise + + def _do_stop(self): + hDirectory = self._hDirectory + self._callbacks = [] + self._hDirectory = None + CloseHandle(hDirectory) + self._stopped = True -- 2.45.2