]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
WIP to unify queue processing between uploader and downloader.
authorDaira Hopwood <daira@jacaranda.org>
Thu, 30 Jul 2015 17:52:05 +0000 (18:52 +0100)
committerDaira Hopwood <daira@jacaranda.org>
Thu, 30 Jul 2015 17:52:05 +0000 (18:52 +0100)
Signed-off-by: Daira Hopwood <daira@jacaranda.org>
src/allmydata/frontends/magic_folder.py

index 0f24c5487d60bf02c29b637220a0eeeef3bf4d14..c5195f63150847dbf2e1faa8772424debb818679 100644 (file)
@@ -104,13 +104,16 @@ class QueueMixin(object):
         self._pending = set()
         self._callback = lambda ign: None
         self._ignore_count = 0
+        self._stopped = False
+        self._turn_delay = 0
 
     def _count(self, counter_name, delta=1):
         self._client.stats_provider.count('magic_folder.%s.%s' % (self._name, counter_name), delta)
 
     def _log(self, msg):
-        self._client.log("Magic Folder %s: %s" % (self._name, msg))
-        #print "_log %s" % (msg,)
+        s = "Magic Folder %s: %s" % (self._name, msg)
+        self._client.log(s)
+        print s
         #open("events", "ab+").write(msg)
 
     def _append_to_deque(self, path):
@@ -123,14 +126,17 @@ class QueueMixin(object):
             reactor.callLater(0, self._turn_deque)
 
     def _turn_deque(self):
+        if self._stopped:
+            return
         try:
-            path = self._deque.pop()
+            item = self._deque.pop()
         except IndexError:
-            self._log("magic folder upload deque is now empty")
-            self._lazy_tail = defer.succeed(None)
-            return
-        self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._process, path))
-        self._lazy_tail.addCallback(lambda ign: self._turn_deque())
+            self._log("deque is now empty")
+            self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
+        else:
+            self._lazy_tail.addCallback(lambda ign: self._process(item))
+            #self._lazy_tail.addErrback(lambda f: self._log("error: %s" % (f,)))
+            self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._turn_delay, self._turn_deque))
 
     def _do_callback(self, res):
         if self._ignore_count == 0:
@@ -270,6 +276,9 @@ class Uploader(QueueMixin):
         path_u = unicode_from_filepath(path)
         self._append_to_deque(path_u)
 
+    def _when_queue_is_empty(self):
+        return defer.succeed(None)
+
     def _process(self, path_u):
         precondition(isinstance(path_u, unicode), path_u)
         d = defer.succeed(None)
@@ -373,9 +382,8 @@ class Downloader(QueueMixin):
         if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
             raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
 
-        self._remote_scan_delay = 3 # XXX
+        self._turn_delay = 3 # delay between remote scans
         self._download_scan_batch = {} # path -> [(filenode, metadata)]
-        self._stopped = False
 
     def start_scanning(self):
         self._scan_remote_collective()
@@ -493,7 +501,13 @@ class Downloader(QueueMixin):
                 extension += [(name, file_node, metadata)]
         return extension
 
-    def _download_file(self, name, file_node):
+    def _when_queue_is_empty(self):
+        d = task.deferLater(reactor, self._turn_delay, self._scan_remote_collective)
+        d.addCallback(lambda ign: self._turn_deque())
+        return d
+
+    def _process(self, item):
+        (name, file_node, metadata) = item
         d = file_node.download_best_version()
         def succeeded(res):
             d.addCallback(lambda result: self._write_downloaded_file(name, result))
@@ -522,18 +536,3 @@ class Downloader(QueueMixin):
         self._count('download_objects_queued')
         if self.is_ready:
             reactor.callLater(0, self._turn_deque)
-
-    # FIXME move to QueueMixin
-    def _turn_deque(self):
-        if self._stopped:
-            return
-        try:
-            file_path, file_node, metadata = self._deque.pop()
-        except IndexError:
-            self._log("magic folder upload deque is now empty")
-            self._lazy_tail = defer.succeed(None)
-            self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._scan_remote_collective))
-            self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._turn_deque))
-            return
-        self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._download_file, file_path, file_node))
-        self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._turn_deque))