Simplify _scan_remote_* and remove Downloader._download_scan_batch attribute.
authorDaira Hopwood <daira@jacaranda.org>
Tue, 3 Nov 2015 02:31:34 +0000 (02:31 +0000)
committerDaira Hopwood <daira@jacaranda.org>
Tue, 3 Nov 2015 02:31:34 +0000 (02:31 +0000)
Signed-off-by: Daira Hopwood <daira@jacaranda.org>
src/allmydata/frontends/magic_folder.py

index cf23afd1c3fa28146144b04e43b5d01983da9a6c..6d77ceb38047456cf298c76cb71c814086a6c0d3 100644 (file)
@@ -515,7 +515,6 @@ class Downloader(QueueMixin, WriteFileMixin):
         self._upload_readonly_dircap = upload_readonly_dircap
 
         self._turn_delay = self.REMOTE_SCAN_INTERVAL
-        self._download_scan_batch = {} # path -> [(filenode, metadata)]
 
     def start_scanning(self):
         self._log("start_scanning")
@@ -589,14 +588,8 @@ class Downloader(QueueMixin, WriteFileMixin):
         collective_dirmap_d.addCallback(highest_version)
         return collective_dirmap_d
 
-    def _append_to_batch(self, name, file_node, metadata):
-        if self._download_scan_batch.has_key(name):
-            self._download_scan_batch[name] += [(file_node, metadata)]
-        else:
-            self._download_scan_batch[name] = [(file_node, metadata)]
-
-    def _scan_remote(self, nickname, dirnode):
-        self._log("_scan_remote nickname %r" % (nickname,))
+    def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
+        self._log("_scan_remote_dmd nickname %r" % (nickname,))
         d = dirnode.list()
         def scan_listing(listing_map):
             for encoded_relpath_u in listing_map.keys():
@@ -607,16 +600,21 @@ class Downloader(QueueMixin, WriteFileMixin):
                 local_version = self._get_local_latest(relpath_u)
                 remote_version = metadata.get('version', None)
                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
+
                 if local_version is None or remote_version is None or local_version < remote_version:
                     self._log("%r added to download queue" % (relpath_u,))
-                    self._append_to_batch(relpath_u, file_node, metadata)
+                    if scan_batch.has_key(relpath_u):
+                        scan_batch[relpath_u] += [(file_node, metadata)]
+                    else:
+                        scan_batch[relpath_u] = [(file_node, metadata)]
+
         d.addCallback(scan_listing)
-        d.addBoth(self._logcb, "end of _scan_remote")
+        d.addBoth(self._logcb, "end of _scan_remote_dmd")
         return d
 
     def _scan_remote_collective(self):
         self._log("_scan_remote_collective")
-        self._download_scan_batch = {} # XXX
+        scan_batch = {}  # path -> [(filenode, metadata)]
 
         d = self._collective_dirnode.list()
         def scan_collective(dirmap):
@@ -624,38 +622,31 @@ class Downloader(QueueMixin, WriteFileMixin):
             for dir_name in dirmap:
                 (dirnode, metadata) = dirmap[dir_name]
                 if dirnode.get_readonly_uri() != self._upload_readonly_dircap:
-                    d2.addCallback(lambda ign, dir_name=dir_name: self._scan_remote(dir_name, dirnode))
+                    d2.addCallback(lambda ign, dir_name=dir_name:
+                                   self._scan_remote_dmd(dir_name, dirnode, scan_batch))
                     def _err(f):
                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
                         # XXX what should we do to make this failure more visible to users?
                     d2.addErrback(_err)
+
             return d2
         d.addCallback(scan_collective)
-        d.addCallback(self._filter_scan_batch)
-        d.addCallback(self._add_batch_to_download_queue)
-        return d
 
-    def _add_batch_to_download_queue(self, result):
-        self._log("result = %r" % (result,))
-        self._log("deque = %r" % (self._deque,))
-        self._deque.extend(result)
-        self._log("deque after = %r" % (self._deque,))
-        self._count('objects_queued', len(result))
-
-    def _filter_scan_batch(self, result):
-        self._log("_filter_scan_batch")
-        extension = [] # consider whether this should be a dict
-        for relpath_u in self._download_scan_batch.keys():
-            if relpath_u in self._pending:
-                continue
-            file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
-            if self._should_download(relpath_u, metadata['version']):
-                extension += [(relpath_u, file_node, metadata)]
-            else:
-                self._log("Excluding %r" % (relpath_u,))
-                self._count('objects_excluded')
-                self._call_hook(None, 'processed')
-        return extension
+        def _filter_batch_to_deque(ign):
+            self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
+            for relpath_u in scan_batch.keys():
+                file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
+
+                if self._should_download(relpath_u, metadata['version']):
+                    self._deque.append( (relpath_u, file_node, metadata) )
+                else:
+                    self._log("Excluding %r" % (relpath_u,))
+                    self._count('objects_excluded')
+                    self._call_hook(None, 'processed')
+
+            self._log("deque after = %r" % (self._deque,))
+        d.addCallback(_filter_batch_to_deque)
+        return d
 
     def _when_queue_is_empty(self):
         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)