from allmydata.util import log, fileutil
from allmydata.util.pollmixin import PollMixin
-from ctypes import WINFUNCTYPE, WinError, windll, POINTER, byref, create_string_buffer, addressof
+from ctypes import WINFUNCTYPE, WinError, windll, POINTER, byref, \
+ create_string_buffer, addressof, Structure
from ctypes.wintypes import BOOL, HANDLE, DWORD, LPCWSTR, LPVOID
-# <http://msdn.microsoft.com/en-us/library/gg258116%28v=vs.85%29.aspx>
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/gg258116%28v=vs.85%29.aspx>
FILE_LIST_DIRECTORY = 1
-# <http://msdn.microsoft.com/en-us/library/aa363858%28v=vs.85%29.aspx>
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/aa363858%28v=vs.85%29.aspx>
CreateFileW = WINFUNCTYPE(HANDLE, LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE) \
(("CreateFileW", windll.kernel32))
OPEN_EXISTING = 3
FILE_FLAG_BACKUP_SEMANTICS = 0x02000000
+FILE_FLAG_OVERLAPPED = 0x40000000
-# <http://msdn.microsoft.com/en-us/library/ms724211%28v=vs.85%29.aspx>
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/ms724211%28v=vs.85%29.aspx>
CloseHandle = WINFUNCTYPE(BOOL, HANDLE)(("CloseHandle", windll.kernel32))
-# <http://msdn.microsoft.com/en-us/library/aa365465%28v=vs.85%29.aspx>
-ReadDirectoryChangesW = WINFUNCTYPE(BOOL, HANDLE, LPVOID, DWORD, BOOL, DWORD, POINTER(DWORD), LPVOID, LPVOID) \
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/ms684342%28v=vs.85%29.aspx>
+class OVERLAPPED(Structure):
+ _fields_ = [('Internal', LPVOID),
+ ('InternalHigh', LPVOID),
+ ('Offset', DWORD),
+ ('OffsetHigh', DWORD),
+ ('Pointer', LPVOID),
+ ('hEvent', HANDLE),
+ ]
+
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/aa365465%28v=vs.85%29.aspx>
+ReadDirectoryChangesW = WINFUNCTYPE(BOOL, HANDLE, LPVOID, DWORD, BOOL, DWORD,
+ POINTER(DWORD), POINTER(OVERLAPPED), LPVOID) \
(("ReadDirectoryChangesW", windll.kernel32))
FILE_NOTIFY_CHANGE_FILE_NAME = 0x00000001
#FILE_NOTIFY_CHANGE_CREATION = 0x00000040
FILE_NOTIFY_CHANGE_SECURITY = 0x00000100
-# <http://msdn.microsoft.com/en-us/library/aa364391%28v=vs.85%29.aspx>
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/aa364391%28v=vs.85%29.aspx>
FILE_ACTION_ADDED = 0x00000001
FILE_ACTION_REMOVED = 0x00000002
FILE_ACTION_MODIFIED = 0x00000003
INVALID_HANDLE_VALUE = 0xFFFFFFFF
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/ms687025%28v=vs.85%29.aspx>
+WaitForMultipleObjects = WINFUNCTYPE(DWORD, DWORD, POINTER(HANDLE), BOOL, DWORD) \
+ (("WaitForMultipleObjects", windll.kernel32))
-class Event(object):
+INFINITE = 0xFFFFFFFF
+
+WAIT_ABANDONED = 0x00000080
+WAIT_IO_COMPLETION = 0x000000C0
+WAIT_OBJECT_0 = 0x00000000
+WAIT_TIMEOUT = 0x00000102
+WAIT_FAILED = 0xFFFFFFFF
+
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/ms682396%28v=vs.85%29.aspx>
+CreateEventW = WINFUNCTYPE(HANDLE, LPVOID, BOOL, BOOL, LPCWSTR) \
+ (("CreateEventW", windll.kernel32))
+
+# <https://msdn.microsoft.com/en-us/library/windows/desktop/ms686211%28v=vs.85%29.aspx>
+SetEvent = WINFUNCTYPE(BOOL, HANDLE)(("SetEvent", windll.kernel32))
+
+FALSE = 0
+TRUE = 1
+
+def _create_event(auto_reset):
+ # no security descriptor, auto_reset, unsignalled, anonymous
+ hEvent = CreateEventW(None, auto_reset, FALSE, None)
+ if hEvent is None:
+ raise WinError()
+
+def _signal_event(hEvent):
+ if SetEvent(hEvent) == 0:
+ raise WinError()
+
+
+class StoppedException(Exception):
+ """The notifier has been stopped."""
+ pass
+
+
+class Notification(object):
"""
* action: a FILE_ACTION_* constant (not a bit mask)
* filename: a Unicode string, giving the name relative to the watched directory
self.filename = filename
def __repr__(self):
- return "Event(%r, %r)" % (_action_to_string.get(self.action, self.action), self.filename)
+ return "Notification(%r, %r)" % (_action_to_string.get(self.action, self.action), self.filename)
class FileNotifyInformation(object):
"""
I represent a buffer containing FILE_NOTIFY_INFORMATION structures, and can
- iterate over those structures, decoding them into Event objects.
+ iterate over those structures, decoding them into Notification objects.
"""
def __init__(self, size=1024):
- self.size = size
- self.buffer = create_string_buffer(size)
- address = addressof(self.buffer)
+ self._size = size
+ self._buffer = create_string_buffer(size)
+ address = addressof(self._buffer)
_assert(address & 3 == 0, "address 0x%X returned by create_string_buffer is not DWORD-aligned" % (address,))
- self.data = None
- def read_changes(self, hDirectory, recursive, filter):
+ self._hStopped = _create_event(auto_reset=FALSE)
+ self._hNotified = _create_event(auto_reset=TRUE)
+ self._events = (HANDLE*2)(self._hStopped, self._hNotified)
+
+ self._overlapped = OVERLAPPED()
+ self._overlapped.Internal = None
+ self._overlapped.InternalHigh = None
+ self._overlapped.Offset = 0
+ self._overlapped.OffsetHigh = 0
+ self._overlapped.Pointer = None
+ self._overlapped.hEvent = self._hNotified
+
+ def __del__(self):
+ if hasattr(self, '_hStopped'):
+ CloseHandle(self._hStopped)
+ if hasattr(self, '_hNotified'):
+ CloseHandle(self._hNotified)
+
+ def stop(self):
+ _signal_event(self._hStopped)
+
+ def read_notifications(self, hDirectory, recursive, filter):
+ """This does not block."""
+
bytes_returned = DWORD(0)
print "here"
r = ReadDirectoryChangesW(hDirectory,
- self.buffer,
- self.size,
+ self._buffer,
+ self._size,
recursive,
filter,
byref(bytes_returned),
- None, # NULL -> no overlapped I/O
+ self._overlapped,
None # NULL -> no completion routine
)
if r == 0:
raise WinError()
- self.data = self.buffer.raw[:bytes_returned.value]
- def __iter__(self):
- # Iterator implemented as generator: <http://docs.python.org/library/stdtypes.html#generator-types>
+ def get_notifications(self):
+ """This blocks and then iterates over the notifications."""
+
+ r = WaitForMultipleObjects(2, self._events,
+ FALSE, # wait for any, not all
+ INFINITE)
+ if r == WAIT_FAILED:
+ raise WinError()
+ if r == WAIT_OBJECT_0: # hStopped
+ raise StoppedException()
+ if r != WAIT_OBJECT_0+1: # hNotified
+ raise OSError("unexpected return from WaitForMultipleObjects: %d" % (r,))
+
+ data = self._buffer.raw[:bytes_returned.value]
+ print data
+
pos = 0
while True:
- bytes = self._read_dword(pos+8)
- s = Event(self._read_dword(pos+4),
- self.data[pos+12 : pos+12+bytes].decode('utf-16-le'))
+ bytes = _read_dword(data, pos+8)
+ s = Notification(_read_dword(data, pos+4),
+ data[pos+12 : pos+12+bytes].decode('utf-16-le'))
- next_entry_offset = self._read_dword(pos)
+ next_entry_offset = _read_dword(data, pos)
+ print s
yield s
if next_entry_offset == 0:
break
pos = pos + next_entry_offset
- def _read_dword(self, i):
- # little-endian
- return ( ord(self.data[i]) |
- (ord(self.data[i+1]) << 8) |
- (ord(self.data[i+2]) << 16) |
- (ord(self.data[i+3]) << 24))
+
+def _read_dword(data, i):
+ # little-endian
+ return ( ord(data[i]) |
+ (ord(data[i+1]) << 8) |
+ (ord(data[i+2]) << 16) |
+ (ord(data[i+3]) << 24))
def _open_directory(path_u):
fni = FileNotifyInformation()
print "Waiting..."
while True:
- fni.read_changes(hDirectory, recursive, filter)
- print repr(fni.data)
- for info in fni:
+ fni.read_notifications(hDirectory, recursive, filter)
+ for info in fni.get_notifications():
print info
-NOT_STARTED = "NOT_STARTED"
-STARTED = "STARTED"
-STOPPING = "STOPPING"
-STOPPED = "STOPPED"
-
class INotify(PollMixin):
def __init__(self):
- self._state = NOT_STARTED
+ self._fni = FileNotifyInformation()
+ self._started = False
+ self._stop = False
+ self._stopped = False
self._filter = None
self._callbacks = None
self._hDirectory = None
self._pending_delay = delay
def startReading(self):
+ # Twisted's version of this is synchronous.
deferToThread(self._thread)
- return self.poll(lambda: self._state != NOT_STARTED)
+ return self.poll(lambda: self._started)
def stopReading(self):
- # FIXME race conditions
- if self._state != STOPPED:
- self._state = STOPPING
+ self._stop = True
+ self._fni.stop()
def wait_until_stopped(self):
- fileutil.write(os.path.join(self._path.path, u".ignore-me"), "")
- return self.poll(lambda: self._state == STOPPED)
+ if not self._stop:
+ self.stopReading()
+ return self.poll(lambda: self._stopped)
def watch(self, path, mask=IN_WATCH_MASK, autoAdd=False, callbacks=None, recursive=False):
- precondition(self._state == NOT_STARTED, "watch() can only be called before startReading()", state=self._state)
+ precondition(not self._started, "watch() can only be called before startReading()")
precondition(self._filter is None, "only one watch is supported")
precondition(isinstance(autoAdd, bool), autoAdd=autoAdd)
precondition(isinstance(recursive, bool), recursive=recursive)
# To call Twisted or Tahoe APIs, use reactor.callFromThread as described in
# <http://twistedmatrix.com/documents/current/core/howto/threading.html>.
- fni = FileNotifyInformation()
-
while True:
- self._state = STARTED
- fni.read_changes(self._hDirectory, self._recursive, self._filter)
- for info in fni:
+ # We must set _started to True *after* calling read_notifications, so that
+ # the caller of startReading() can tell when we've actually started reading.
+
+ self._fni.read_notifications(self._hDirectory, self._recursive, self._filter)
+ self._started = True
+
+ for info in self._fni.get_notifications():
print info
- if self._state == STOPPING:
- hDirectory = self._hDirectory
- self._callbacks = None
- self._hDirectory = None
- CloseHandle(hDirectory)
- self._state = STOPPED
- return
+ if self._stop:
+ raise StoppedException()
path = self._path.preauthChild(info.filename) # FilePath with Unicode path
#mask = _action_to_inotify_mask.get(info.action, IN_CHANGED)
log.err(e)
reactor.callLater(self._pending_delay, _do_callbacks)
reactor.callFromThread(_maybe_notify, path)
+ except StoppedException:
+ self._do_stop()
except Exception, e:
log.err(e)
- self._state = STOPPED
+ self._do_stop()
raise
+
+ def _do_stop(self):
+ hDirectory = self._hDirectory
+ self._callbacks = []
+ self._hDirectory = None
+ CloseHandle(hDirectory)
+ self._stopped = True