From: Daira Hopwood Date: Thu, 30 Jul 2015 17:52:05 +0000 (+0100) Subject: WIP to unify queue processing between uploader and downloader. X-Git-Url: https://git.rkrishnan.org/?a=commitdiff_plain;h=1ac461737926a72752833fbb2435aa3501cad0d7;p=tahoe-lafs%2Ftahoe-lafs.git WIP to unify queue processing between uploader and downloader. Signed-off-by: Daira Hopwood --- diff --git a/src/allmydata/frontends/magic_folder.py b/src/allmydata/frontends/magic_folder.py index 0f24c548..c5195f63 100644 --- a/src/allmydata/frontends/magic_folder.py +++ b/src/allmydata/frontends/magic_folder.py @@ -104,13 +104,16 @@ class QueueMixin(object): self._pending = set() self._callback = lambda ign: None self._ignore_count = 0 + self._stopped = False + self._turn_delay = 0 def _count(self, counter_name, delta=1): self._client.stats_provider.count('magic_folder.%s.%s' % (self._name, counter_name), delta) def _log(self, msg): - self._client.log("Magic Folder %s: %s" % (self._name, msg)) - #print "_log %s" % (msg,) + s = "Magic Folder %s: %s" % (self._name, msg) + self._client.log(s) + print s #open("events", "ab+").write(msg) def _append_to_deque(self, path): @@ -123,14 +126,17 @@ class QueueMixin(object): reactor.callLater(0, self._turn_deque) def _turn_deque(self): + if self._stopped: + return try: - path = self._deque.pop() + item = self._deque.pop() except IndexError: - self._log("magic folder upload deque is now empty") - self._lazy_tail = defer.succeed(None) - return - self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._process, path)) - self._lazy_tail.addCallback(lambda ign: self._turn_deque()) + self._log("deque is now empty") + self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty()) + else: + self._lazy_tail.addCallback(lambda ign: self._process(item)) + #self._lazy_tail.addErrback(lambda f: self._log("error: %s" % (f,))) + self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._turn_delay, self._turn_deque)) def _do_callback(self, res): if self._ignore_count == 0: @@ -270,6 +276,9 @@ class Uploader(QueueMixin): path_u = unicode_from_filepath(path) self._append_to_deque(path_u) + def _when_queue_is_empty(self): + return defer.succeed(None) + def _process(self, path_u): precondition(isinstance(path_u, unicode), path_u) d = defer.succeed(None) @@ -373,9 +382,8 @@ class Downloader(QueueMixin): if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly(): raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.") - self._remote_scan_delay = 3 # XXX + self._turn_delay = 3 # delay between remote scans self._download_scan_batch = {} # path -> [(filenode, metadata)] - self._stopped = False def start_scanning(self): self._scan_remote_collective() @@ -493,7 +501,13 @@ class Downloader(QueueMixin): extension += [(name, file_node, metadata)] return extension - def _download_file(self, name, file_node): + def _when_queue_is_empty(self): + d = task.deferLater(reactor, self._turn_delay, self._scan_remote_collective) + d.addCallback(lambda ign: self._turn_deque()) + return d + + def _process(self, item): + (name, file_node, metadata) = item d = file_node.download_best_version() def succeeded(res): d.addCallback(lambda result: self._write_downloaded_file(name, result)) @@ -522,18 +536,3 @@ class Downloader(QueueMixin): self._count('download_objects_queued') if self.is_ready: reactor.callLater(0, self._turn_deque) - - # FIXME move to QueueMixin - def _turn_deque(self): - if self._stopped: - return - try: - file_path, file_node, metadata = self._deque.pop() - except IndexError: - self._log("magic folder upload deque is now empty") - self._lazy_tail = defer.succeed(None) - self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._scan_remote_collective)) - self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._turn_deque)) - return - self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._download_file, file_path, file_node)) - self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._turn_deque))