From: Daira Hopwood Date: Tue, 3 Nov 2015 14:17:35 +0000 (+0000) Subject: Move the pending_delay mechanism from Windows-specific code to magic_folder.py. X-Git-Url: https://git.rkrishnan.org/pf/content/en/seg/something?a=commitdiff_plain;h=f433e7a4160467171635f82e3dc4eca46ef4a52b;p=tahoe-lafs%2Ftahoe-lafs.git Move the pending_delay mechanism from Windows-specific code to magic_folder.py. This is necessary because we have insufficent information in the Windows code about how event masks are used. Signed-off-by: Daira Hopwood --- diff --git a/src/allmydata/frontends/magic_folder.py b/src/allmydata/frontends/magic_folder.py index 1be3e683..491df556 100644 --- a/src/allmydata/frontends/magic_folder.py +++ b/src/allmydata/frontends/magic_folder.py @@ -183,9 +183,7 @@ class Uploader(QueueMixin): self._inotify = get_inotify_module() self._notifier = self._inotify.INotify() self._pending = set() - - if hasattr(self._notifier, 'set_pending_delay'): - self._notifier.set_pending_delay(pending_delay) + self._pending_delay = pending_delay # TODO: what about IN_MOVE_SELF and IN_UNMOUNT? # @@ -222,15 +220,19 @@ class Uploader(QueueMixin): def start_scanning(self): self._log("start_scanning") self.is_ready = True - self._pending = self._db.get_all_relpaths() - self._log("all_files %r" % (self._pending)) - d = self._scan(u"") - def _add_pending(ign): - # This adds all of the files that were in the db but not already processed - # (normally because they have been deleted on disk). - self._log("adding %r" % (self._pending)) - self._deque.extend(self._pending) - d.addCallback(_add_pending) + + # Notify ALL THE THINGS. + # XXX this does not guarantee to notify parents before children. Is that a problem? + + all_relpaths = self._db.get_all_relpaths() + self._log("all_files %r" % (all_relpaths,)) + for relpath_u in all_relpaths: + fp = self._get_filepath(relpath_u) + self._notify(None, fp, IN_CHANGED) + + self._scan(u"") + + d = defer.succeed(None) d.addCallback(lambda ign: self._turn_deque()) return d @@ -239,31 +241,16 @@ class Uploader(QueueMixin): fp = self._get_filepath(reldir_u) try: children = listdir_filepath(fp) - except EnvironmentError: - raise Exception("WARNING: magic folder: permission denied on directory %s" - % quote_filepath(fp)) - except FilenameEncodingError: - raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error" - % quote_filepath(fp)) + except (EnvironmentError, FilenameEncodingError) as e: + self._log("error listing directory %s: %s" % (quote_filepath(fp), e)) + return None - d = defer.succeed(None) for child in children: _assert(isinstance(child, unicode), child=child) - d.addCallback(lambda ign, child=child: - ("%s/%s" % (reldir_u, child) if reldir_u else child)) - def _add_pending(relpath_u): - if magicpath.should_ignore_file(relpath_u): - return None - - self._pending.add(relpath_u) - return relpath_u - d.addCallback(_add_pending) - # This call to _process doesn't go through the deque, and probably should. - d.addCallback(self._process) - d.addBoth(self._call_hook, 'processed') - d.addErrback(log.err) - - return d + relpath_u = ("%s/%s" % (reldir_u, child)) if reldir_u else child + if not magicpath.should_ignore_file(relpath_u): + child_fp = self._get_filepath(relpath_u) + self._notify(None, child_fp, IN_CHANGED) def _notify(self, opaque, path, events_mask): self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask)))) @@ -288,14 +275,21 @@ class Uploader(QueueMixin): return self._log("appending %r to deque" % (relpath_u,)) - self._deque.append(relpath_u) self._pending.add(relpath_u) - self._count('objects_queued') - if self.is_ready: - if self._immediate: # for tests - self._turn_deque() - else: - self._clock.callLater(0, self._turn_deque) + def _do_append(): + self._pending.remove(relpath_u) + self._deque.append(relpath_u) + self._count('objects_queued') + if self.is_ready: + if self._immediate: # for tests + self._turn_deque() + else: + self._clock.callLater(0, self._turn_deque) + + if self._immediate: + _do_append() + else: + self._clock.callLater(self._pending_delay, _do_append) def _when_queue_is_empty(self): return defer.succeed(None) @@ -316,9 +310,6 @@ class Uploader(QueueMixin): fp = self._get_filepath(relpath_u) pathinfo = get_pathinfo(unicode_from_filepath(fp)) - self._log("about to remove %r from pending set %r" % - (relpath_u, self._pending)) - self._pending.remove(relpath_u) encoded_path_u = magicpath.path2magic(relpath_u) if not pathinfo.exists: diff --git a/src/allmydata/windows/inotify.py b/src/allmydata/windows/inotify.py index 4e7bc904..cc7e534d 100644 --- a/src/allmydata/windows/inotify.py +++ b/src/allmydata/windows/inotify.py @@ -199,13 +199,8 @@ class INotify(PollMixin): self._callbacks = None self._hDirectory = None self._path = None - self._pending = set() - self._pending_delay = 1.0 self.recursive_includes_new_subdirectories = True - def set_pending_delay(self, delay): - self._pending_delay = delay - def startReading(self): deferToThread(self._thread) return self.poll(lambda: self._state != NOT_STARTED) @@ -265,20 +260,16 @@ class INotify(PollMixin): return path = self._path.preauthChild(info.filename) # FilePath with Unicode path - #mask = _action_to_inotify_mask.get(info.action, IN_CHANGED) - - def _maybe_notify(path): - if path not in self._pending: - self._pending.add(path) - def _do_callbacks(): - self._pending.remove(path) - for cb in self._callbacks: - try: - cb(None, path, IN_CHANGED) - except Exception, e: - log.err(e) - reactor.callLater(self._pending_delay, _do_callbacks) - reactor.callFromThread(_maybe_notify, path) + mask = _action_to_inotify_mask.get(info.action, IN_CHANGED) + + def _notify(path): + for cb in self._callbacks: + try: + cb(None, path, mask) + except Exception, e: + log.err(e) + + reactor.callFromThread(_notify, path) except Exception, e: log.err(e) self._state = STOPPED