From: David Stainton Date: Fri, 17 Apr 2015 19:59:37 +0000 (+0100) Subject: Teach upload deque to be sequential and asynchronous X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/file/reliability?a=commitdiff_plain;h=9d6b03fd6cf6c439dc560f65d82d71f82f06d6d0;p=tahoe-lafs%2Ftahoe-lafs.git Teach upload deque to be sequential and asynchronous --- diff --git a/src/allmydata/frontends/drop_upload.py b/src/allmydata/frontends/drop_upload.py index 33712897..75248e55 100644 --- a/src/allmydata/frontends/drop_upload.py +++ b/src/allmydata/frontends/drop_upload.py @@ -2,7 +2,7 @@ import sys, os from collections import deque -from twisted.internet import defer, reactor +from twisted.internet import defer, reactor, task from twisted.python.failure import Failure from twisted.python.filepath import FilePath from twisted.application import service @@ -135,21 +135,22 @@ class DropUploader(service.MultiService): processing the upload items... """ self.is_upload_ready = True - self._process_deque() + self._turn_deque() def _append_to_deque(self, path): self._upload_deque.append(path) self._pending.add(path) if self.is_upload_ready: - reactor.callLater(0, self._process_deque) - - def _process_deque(self): - while True: - try: - path = self._upload_deque.pop() - self._process(path) - except IndexError: - break + reactor.callLater(0, self._turn_deque) + + def _turn_deque(self): + try: + path = self._upload_deque.pop() + except IndexError: + self._log("magic folder upload deque is now empty") + return + d = task.deferLater(reactor, 0, self._process, path) + d.addCallback(lambda ign: self._turn_deque()) def _notify(self, opaque, path, events_mask): self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))