From a018d862d40aa06c7cf3c96e43241cedfad05cd3 Mon Sep 17 00:00:00 2001 From: David Stainton Date: Fri, 17 Apr 2015 20:59:37 +0100 Subject: [PATCH] Teach upload deque to be sequential and asynchronous --- src/allmydata/frontends/drop_upload.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/allmydata/frontends/drop_upload.py b/src/allmydata/frontends/drop_upload.py index 4d51d6b6..ffe10ae0 100644 --- a/src/allmydata/frontends/drop_upload.py +++ b/src/allmydata/frontends/drop_upload.py @@ -2,7 +2,7 @@ import sys, os from collections import deque -from twisted.internet import defer, reactor +from twisted.internet import defer, reactor, task from twisted.python.failure import Failure from twisted.python.filepath import FilePath from twisted.application import service @@ -130,21 +130,22 @@ class DropUploader(service.MultiService): processing the upload items... """ self.is_upload_ready = True - self._process_deque() + self._turn_deque() def _append_to_deque(self, path): self._upload_deque.append(path) self._pending.add(path) if self.is_upload_ready: - reactor.callLater(0, self._process_deque) - - def _process_deque(self): - while True: - try: - path = self._upload_deque.pop() - self._process(path) - except IndexError: - break + reactor.callLater(0, self._turn_deque) + + def _turn_deque(self): + try: + path = self._upload_deque.pop() + except IndexError: + self._log("magic folder upload deque is now empty") + return + d = task.deferLater(reactor, 0, self._process, path) + d.addCallback(lambda ign: self._turn_deque()) def _notify(self, opaque, path, events_mask): self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask)))) -- 2.45.2