f.close()
if temporary and sys.platform == "win32":
os.unlink(path_u)
- self.notify(path, self.inotify.IN_DELETE)
+ self.notify(path, self.inotify.IN_DELETE, flush=False)
event_mask = self.inotify.IN_CLOSE_WRITE
- fileutil.flush_volume(path_u)
self.notify(path, event_mask)
encoded_name_u = magicpath.path2magic(name_u)
def notify_bob_moved(ign):
d0 = self.bob_magicfolder.uploader.set_hook('processed')
p = abspath_expanduser_unicode(u"file1", base=self.bob_magicfolder.uploader._local_path_u)
- self.notify(to_filepath(p), self.inotify.IN_MOVED_FROM, magic=self.bob_magicfolder)
+ self.notify(to_filepath(p), self.inotify.IN_MOVED_FROM, magic=self.bob_magicfolder, flush=False)
self.notify(to_filepath(p + u'.backup'), self.inotify.IN_MOVED_TO, magic=self.bob_magicfolder)
bob_clock.advance(0)
return d0
self.inotify = fake_inotify
self.patch(magic_folder, 'get_inotify_module', lambda: self.inotify)
- def notify(self, path, mask, magic=None):
+ def notify(self, path, mask, magic=None, flush=True):
if magic is None:
magic = self.magicfolder
magic.uploader._notifier.event(path, mask)
+ # no flush for the mock test.
def test_errors(self):
self.set_up_grid()
MagicFolderTestMixin.setUp(self)
self.inotify = magic_folder.get_inotify_module()
- def notify(self, path, mask, **kw):
+ def notify(self, path, mask, magic=None, flush=True):
# Writing to the filesystem causes the notification.
- pass
+ # However, flushing filesystem buffers may be necessary on Windows.
+ if flush:
+ fileutil.flush_volume(path)
try:
magic_folder.get_inotify_module()