From: Daira Hopwood <daira@jacaranda.org>
Date: Thu, 23 Jul 2015 23:27:26 +0000 (+0100)
Subject: Refactor MagicFolder into Uploader and Downloader classes (WIP). refs ticket:2477
X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/simplejson/about.html?a=commitdiff_plain;h=0b80ffaa81617ec802a7224e5b4cbdc5a2c5a029;p=tahoe-lafs%2Ftahoe-lafs.git

Refactor MagicFolder into Uploader and Downloader classes (WIP). refs ticket:2477

Signed-off-by: Daira Hopwood <daira@jacaranda.org>
---

diff --git a/src/allmydata/frontends/magic_folder.py b/src/allmydata/frontends/magic_folder.py
index 7532f2b1..68107f3f 100644
--- a/src/allmydata/frontends/magic_folder.py
+++ b/src/allmydata/frontends/magic_folder.py
@@ -46,55 +46,93 @@ class MagicFolder(service.MultiService):
         precondition_abspath(local_dir)
 
         service.MultiService.__init__(self)
-        self._stopped = False
-        self._remote_scan_delay = 3 # XXX
-        self._local_dir = local_dir
-        self._upload_lazy_tail = defer.succeed(None)
-        self._upload_pending = set()
-        self._download_scan_batch = {} # path -> [(filenode, metadata)]
-        self._download_lazy_tail = defer.succeed(None)
-        self._download_pending = set()
-        self._collective_dirnode = None
-        self._client = client
-        self._stats_provider = client.stats_provider
-        self._convergence = client.convergence
-        self._local_path = to_filepath(self._local_dir)
-        self._dbfile = dbfile
+        local_path = to_filepath(local_dir)
 
-        self._download_deque = deque()
-        self._upload_deque = deque()
-        self.is_ready = False
+        db = backupdb.get_backupdb(dbfile, create_version=(backupdb.SCHEMA_v3, 3))
+        if db is None:
+            return Failure(Exception('ERROR: Unable to load magic folder db.'))
 
-        self._inotify = inotify or get_inotify_module()
+        self.is_ready = False
 
-        if not self._local_path.exists():
+        if not local_path.exists():
             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
                                  "but there is no directory at that location."
                                  % quote_local_unicode_path(local_dir))
-        if not self._local_path.isdir():
+        if not local_path.isdir():
             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
                                  "but the thing at that location is not a directory."
                                  % quote_local_unicode_path(local_dir))
 
+        self.uploader = Uploader(client, local_path, db, upload_dircap, inotify, pending_delay)
+        self.downloader = Downloader(client, local_path, db, collective_dircap)
+
+    def startService(self):
+        service.MultiService.startService(self)
+        return self.uploader.start_monitoring()
+
+    def ready(self):
+        """ready is used to signal us to start
+        processing the upload and download items...
+        """
+        self.is_ready = True
+        self.uploader.start_scanning()
+        self.downloader.start_scanning()
+
+    def finish(self):
+        d = self.uploader.stop()
+        d.addBoth(lambda ign: self.downloader.stop())
+        return d
+
+    def remove_service(self):
+        return service.MultiService.disownServiceParent(self)
+
+
+class QueueMixin(object):
+    def __init__(self, client, counter, local_path, db):
+        self._client = client
+        self._counter = client.stats_provider.count
+        self._local_path = local_path
+        self._db = db
+
+        self._deque = deque()
+        self._lazy_tail = defer.succeed(None)
+        self._pending = set()
+        self._processed_callback = lambda ign: None
+        self._ignore_count = 0
+
+    def _do_callback(self, res):
+        if self._ignore_count == 0:
+            self._callback(res)
+        else:
+            self._ignore_count -= 1
+        return None  # intentionally suppress failures, which have already been logged
+
+    def set_callback(self, callback, ignore_count=0):
+        """
+        set_callback sets a function that will be called after a filesystem change
+        (either local or remote) has been processed, successfully or unsuccessfully.
+        """
+        self._callback = callback
+        self._ignore_count = ignore_count
+
+    def _log(self, msg):
+        self._client.log("Magic Folder: " + msg)
+        #print "_log %s" % (msg,)
+        #open("events", "ab+").write(msg)
+
+
+class Uploader(QueueMixin):
+    def __init__(self, client, local_path, db, upload_dircap, inotify, pending_delay):
+        QueueMixin.__init__(self, client, local_path, db)
+
         # TODO: allow a path rather than a cap URI.
         self._upload_dirnode = self._client.create_node_from_uri(upload_dircap)
-        self._collective_dirnode = self._client.create_node_from_uri(collective_dircap)
-
         if not IDirectoryNode.providedBy(self._upload_dirnode):
             raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.")
         if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly():
             raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.")
 
-        if not IDirectoryNode.providedBy(self._collective_dirnode):
-            raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.")
-        if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
-            raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
-
-        self._processed_callback = lambda ign: None
-        self._download_callback = lambda ign: None
-        self._upload_ignore_count = 0
-        self._download_ignore_count = 0
-
+        self._inotify = inotify or get_inotify_module()
         self._notifier = self._inotify.INotify()
 
         if hasattr(self._notifier, 'set_pending_delay'):
@@ -115,126 +153,24 @@ class MagicFolder(service.MultiService):
         self._notifier.watch(self._local_path, mask=self.mask, callbacks=[self._notify],
                              recursive=True)
 
-    def _should_download(self, relpath_u, remote_version):
-        """
-        _should_download returns a bool indicating whether or not a remote object should be downloaded.
-        We check the remote metadata version against our magic-folder db version number;
-        latest version wins.
-        """
-        v = self._db.get_local_file_version(relpath_u)
-        return (v is None or v < remote_version)
-
-    def _get_collective_latest_file(self, filename):
-        """_get_collective_latest_file takes a file path pointing to a file managed by
-        magic-folder and returns a deferred that fires with the two tuple containing a
-        file node and metadata for the latest version of the file located in the
-        magic-folder collective directory.
-        """
-        upload_readonly_dircap = self._upload_dirnode.get_readonly_uri()
-        collective_dirmap_d = self._collective_dirnode.list()
-        def do_filter(result):
-            print result
-            others = [x for x in result.keys() if result[x][0].get_readonly_uri() != upload_readonly_dircap]
-            return result, others
-        collective_dirmap_d.addCallback(do_filter)
-        def scan_collective(result):
-            list_of_deferreds = []
-            collective_dirmap, others_list = result
-            for dir_name in result:
-                # XXX make sure it's a directory
-                d = defer.succeed(None)
-                d.addCallback(lambda x, dir_name=dir_name: collective_dirmap[dir_name][0].get_child_and_metadata(filename))
-                list_of_deferreds.append(d)
-            deferList = defer.DeferredList(list_of_deferreds)
-            return deferList
-        collective_dirmap_d.addCallback(scan_collective)
-        def highest_version(deferredList):
-            max_version = 0
-            metadata = None
-            node = None
-            for success, result in deferredList:
-                if success:
-                    if result[1]['version'] > max_version:
-                        node, metadata = result
-                        max_version = result[1]['version']
-            return node, metadata
-        collective_dirmap_d.addCallback(highest_version)
-        return collective_dirmap_d
+    def start_monitoring(self):
+        d = self._notifier.startReading()
+        self._counter('magic_folder.dirs_monitored', 1)
+        return d
 
-    def _scan_remote(self, nickname, dirnode):
-        listing_d = dirnode.list()
-        self._download_scan_batch = {}
-        def scan_listing(listing_map):
-            for name in listing_map.keys():
-                file_node, metadata = listing_map[name]
-                if self._download_scan_batch.has_key(name):
-                    self._download_scan_batch[name] += [(file_node, metadata)]
-                else:
-                    self._download_scan_batch[name] = [(file_node, metadata)]
-        listing_d.addCallback(scan_listing)
-        return listing_d
+    def stop(self):
+        self._notifier.stopReading()
+        self._counter('magic_folder.dirs_monitored', -1)
 
-    def _scan_remote_collective(self):
-        if self._collective_dirnode is None:
-            return
-        upload_readonly_dircap = self._upload_dirnode.get_readonly_uri()
-        collective_dirmap_d = self._collective_dirnode.list()
-        def do_filter(result):
-            others = [x for x in result.keys() if result[x][0].get_readonly_uri() != upload_readonly_dircap]
-            return result, others
-        collective_dirmap_d.addCallback(do_filter)
-        def scan_collective(result):
+        if hasattr(self._notifier, 'wait_until_stopped'):
+            d = self._notifier.wait_until_stopped()
+        else:
             d = defer.succeed(None)
-            collective_dirmap, others_list = result
-            for dir_name in others_list:
-                d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
-                # XXX todo add errback
-            return d
-        collective_dirmap_d.addCallback(scan_collective)
-        collective_dirmap_d.addCallback(self._filter_scan_batch)
-        collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
-        return collective_dirmap_d
-
-    def _add_batch_to_download_queue(self, result):
-        self._download_deque.extend(result)
-        self._download_pending.update(map(lambda x: x[0], result))
-
-    def _filter_scan_batch(self, result):
-        extension = [] # consider whether this should be a dict
-        for name in self._download_scan_batch.keys():
-            if name in self._download_pending:
-                continue
-            file_node, metadata = max(self._download_scan_batch[name], key=lambda x: x[1]['version'])
-            if self._should_download(name, metadata['version']):
-                extension += [(name, file_node, metadata)]
-        return extension
-
-    def _download_file(self, name, file_node):
-        d = file_node.download_best_version()
-        def succeeded(res):
-            d.addCallback(lambda result: self._write_downloaded_file(name, result))
-            self._stats_provider.count('magic_folder.objects_downloaded', 1)
-            return None
-        def failed(f):
-            self._log("download failed")
-            self._stats_provider.count('magic_folder.objects_download_failed', 1)
-            return f
-        def remove_from_pending(result):
-            self._download_pending = self._download_pending.difference(set([name]))
-        d.addCallbacks(succeeded, failed)
-        d.addBoth(self._do_download_callback)
-        d.addBoth(remove_from_pending)
         return d
 
-    def _write_downloaded_file(self, name, file_contents):
-        print "_write_downloaded_file: no-op."
-
-    def _db_file_is_uploaded(self, childpath):
-        """_db_file_is_uploaded returns true if the file was previously uploaded
-        """
-        assert self._db != None
-        r = self._db.check_file(childpath)
-        return r.was_uploaded()
+    def start_scanning(self):
+        self._scan(self._local_dir)
+        self._turn_deque()
 
     def _scan(self, localpath):
         if not os.path.isdir(localpath):
@@ -263,7 +199,7 @@ class MagicFolder(service.MultiService):
                     return None
                 elif isdir:
                     # process directories unconditionally
-                    self._append_to_upload_deque(childpath)
+                    self._append_to_deque(childpath)
 
                     # recurse on the child directory
                     return self._scan(childpath)
@@ -271,7 +207,7 @@ class MagicFolder(service.MultiService):
                     file_version = self._db.get_local_file_version(childpath)
                     if file_version is None:
                         # XXX upload if we didn't record our version in magicfolder db?
-                        self._append_to_upload_deque(childpath)
+                        self._append_to_deque(childpath)
                         return None
                     else:
                         d2 = self._get_collective_latest_file(childpath)
@@ -281,7 +217,7 @@ class MagicFolder(service.MultiService):
                                 return None
                             if file_version > collective_version:
                                 self._append_to_upload_deque(childpath)
-                            elif file_version < collective_version:
+                            elif file_version < collective_version: # FIXME Daira thinks this is wrong
                                 # if a collective version of the file is newer than ours
                                 # we must download it and unlink the old file from our upload dirnode
                                 self._append_to_download_deque(childpath)
@@ -300,72 +236,31 @@ class MagicFolder(service.MultiService):
 
         return d
 
-    def startService(self):
-        self._db = backupdb.get_backupdb(self._dbfile, create_version=(backupdb.SCHEMA_v3, 3))
-        if self._db is None:
-            return Failure(Exception('ERROR: Unable to load magic folder db.'))
-
-        service.MultiService.startService(self)
-        d = self._notifier.startReading()
-        self._stats_provider.count('magic_folder.dirs_monitored', 1)
-        return d
-
-    def ready(self):
-        """ready is used to signal us to start
-        processing the upload and download items...
-        """
-        self.is_ready = True
-        self._scan(self._local_dir)
-        self._scan_remote_collective()
-        self._turn_upload_deque()
-        self._turn_download_deque()
-
-    def _turn_download_deque(self):
-        if self._stopped:
-            return
-        try:
-            file_path, file_node, metadata = self._download_deque.pop()
-        except IndexError:
-            self._log("magic folder upload deque is now empty")
-            self._download_lazy_tail = defer.succeed(None)
-            self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._scan_remote_collective))
-            self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._turn_download_deque))
-            return
-        self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._download_file, file_path, file_node))
-        self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._turn_download_deque))
-
-    def _append_to_download_deque(self, path):
-        if path in self._download_scan_batch.keys():
-            return
-        self._download_deque.append(path)
-        self._download_pending.add(path)
-        self._stats_provider.count('magic_folder.download_objects_queued', 1)
-        if self.is_ready:
-            reactor.callLater(0, self._turn_download_deque)
-
-    def _append_to_upload_deque(self, path):
-        if path in self._upload_pending:
+    # FIXME move to QueueMixin
+    def _append_to_deque(self, path):
+        if path in self._pending:
             return
-        self._upload_deque.append(path)
-        self._upload_pending.add(path)
-        self._stats_provider.count('magic_folder.objects_queued', 1)
+        self._deque.append(path)
+        self._pending.add(path)
+        self._counter('magic_folder.objects_queued', 1)
         if self.is_ready:
-            reactor.callLater(0, self._turn_upload_deque)
+            reactor.callLater(0, self._turn_deque)
 
-    def _turn_upload_deque(self):
+    # FIXME move to QueueMixin
+    def _turn_deque(self):
         try:
-            path = self._upload_deque.pop()
+            path = self._deque.pop()
         except IndexError:
             self._log("magic folder upload deque is now empty")
-            self._upload_lazy_tail = defer.succeed(None)
+            self._lazy_tail = defer.succeed(None)
             return
-        self._upload_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._process, path))
-        self._upload_lazy_tail.addCallback(lambda ign: self._turn_upload_deque())
+        self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._process, path))
+        self._lazy_tail.addCallback(lambda ign: self._turn_deque())
 
     def _notify(self, opaque, path, events_mask):
         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
         path_u = unicode_from_filepath(path)
-        self._append_to_upload_deque(path_u)
+        self._append_to_deque(path_u)
 
     def _process(self, path_u):
         precondition(isinstance(path_u, unicode), path_u)
@@ -382,7 +277,7 @@ class MagicFolder(service.MultiService):
             upload_d = self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":0}, overwrite=True)
             def _succeeded(ign):
                 self._log("created subdirectory %r" % (path_u,))
-                self._stats_provider.count('magic_folder.directories_created', 1)
+                self._counter('magic_folder.directories_created', 1)
             def _failed(f):
                 self._log("failed to create subdirectory %r" % (path_u,))
                 return f
@@ -405,7 +300,7 @@ class MagicFolder(service.MultiService):
             if not os.path.exists(path_u):
                 self._log("drop-upload: notified object %r disappeared "
                           "(this is normal for temporary objects)" % (path_u,))
-                self._stats_provider.count('magic_folder.objects_disappeared', 1)
+                self._counter('magic_folder.objects_disappeared', 1)
                 d2 = defer.succeed(None)
                 if self._db.check_file_db_exists(relpath_u):
                     d2.addCallback(get_metadata)
@@ -413,8 +308,8 @@ class MagicFolder(service.MultiService):
                         current_version = self._db.get_local_file_version(relpath_u) + 1
                         metadata['version'] = current_version
                         metadata['deleted'] = True
-                        emptyUploadable = Data("", self._convergence)
-                        return self._upload_dirnode.add_file(encoded_name_u, emptyUploadable, overwrite=True, metadata=metadata)
+                        empty_uploadable = Data("", self._convergence)
+                        return self._upload_dirnode.add_file(encoded_name_u, empty_uploadable, overwrite=True, metadata=metadata)
                     d2.addCallback(set_deleted)
                 d2.addCallback(lambda x: Exception("file does not exist"))
                 return d2
@@ -437,7 +332,7 @@ class MagicFolder(service.MultiService):
                     ctime = s[stat.ST_CTIME]
                     mtime = s[stat.ST_MTIME]
                     self._db.did_upload_file(filecap, relpath_u, version, mtime, ctime, size)
-                    self._stats_provider.count('magic_folder.files_uploaded', 1)
+                    self._counter('magic_folder.files_uploaded', 1)
                 d2.addCallback(add_db_entry)
                 return d2
             else:
@@ -446,65 +341,176 @@ class MagicFolder(service.MultiService):
         d.addCallback(_maybe_upload)
 
         def _succeeded(res):
-            self._stats_provider.count('magic_folder.objects_queued', -1)
-            self._stats_provider.count('magic_folder.objects_succeeded', 1)
+            self._counter('magic_folder.objects_queued', -1)
+            self._counter('magic_folder.objects_succeeded', 1)
             return res
         def _failed(f):
-            self._stats_provider.count('magic_folder.objects_queued', -1)
-            self._stats_provider.count('magic_folder.objects_failed', 1)
+            self._counter('magic_folder.objects_queued', -1)
+            self._counter('magic_folder.objects_failed', 1)
             self._log("%r while processing %r" % (f, path_u))
             return f
         d.addCallbacks(_succeeded, _failed)
-        d.addBoth(self._do_processed_callback)
+        d.addBoth(self._do_callback)
         return d
 
-    def _do_download_callback(self, res):
-        if self._download_ignore_count == 0:
-            self._download_callback(res)
-        else:
-            self._download_ignore_count -= 1
-        return None  # intentionally suppress failures, which have already been logged
 
-    def _do_processed_callback(self, res):
-        if self._upload_ignore_count == 0:
-            self._processed_callback(res)
-        else:
-            self._upload_ignore_count -= 1
-        return None  # intentionally suppress failures, which have already been logged
+class Downloader(QueueMixin):
+    def __init__(self, client, local_path, db, collective_dircap):
+        QueueMixin.__init__(self, client, local_path, db)
+
+        # TODO: allow a path rather than a cap URI.
+        self._collective_dirnode = self._client.create_node_from_uri(collective_dircap)
+
+        if not IDirectoryNode.providedBy(self._collective_dirnode):
+            raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.")
+        if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
+            raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
+
+        self._remote_scan_delay = 3 # XXX
+        self._download_scan_batch = {} # path -> [(filenode, metadata)]
 
-    def set_download_callback(self, callback, ignore_count=0):
+    def start_scanning(self):
+        self._scan_remote_collective()
+        self._turn_deque()
+
+    def stop(self):
+        return self._lazy_tail
+
+    def _should_download(self, relpath_u, remote_version):
         """
-        set_download_callback sets a function that will be called after a
-        remote filesystem notification has been processed (successfully or unsuccessfully).
+        _should_download returns a bool indicating whether or not a remote object should be downloaded.
+        We check the remote metadata version against our magic-folder db version number;
+        latest version wins.
         """
-        self._download_callback = callback
-        self._download_ignore_count = ignore_count
+        v = self._db.get_local_file_version(relpath_u)
+        return (v is None or v < remote_version)
 
-    def set_processed_callback(self, callback, ignore_count=0):
-        """
-        set_processed_callback sets a function that will be called after a
-        local filesystem notification has been processed (successfully or unsuccessfully).
+    def _get_collective_latest_file(self, filename):
+        """_get_collective_latest_file takes a file path pointing to a file managed by
+        magic-folder and returns a deferred that fires with the two tuple containing a
+        file node and metadata for the latest version of the file located in the
+        magic-folder collective directory.
         """
-        self._processed_callback = callback
-        self._upload_ignore_count = ignore_count
+        upload_readonly_dircap = self._upload_dirnode.get_readonly_uri()
+        collective_dirmap_d = self._collective_dirnode.list()
+        def do_filter(result):
+            print result
+            others = [x for x in result.keys() if result[x][0].get_readonly_uri() != upload_readonly_dircap]
+            return result, others
+        collective_dirmap_d.addCallback(do_filter)
+        def scan_collective(result):
+            list_of_deferreds = []
+            collective_dirmap, others_list = result
+            for dir_name in result:
+                # XXX make sure it's a directory
+                d = defer.succeed(None)
+                d.addCallback(lambda x, dir_name=dir_name: collective_dirmap[dir_name][0].get_child_and_metadata(filename))
+                list_of_deferreds.append(d)
+            deferList = defer.DeferredList(list_of_deferreds)
+            return deferList
+        collective_dirmap_d.addCallback(scan_collective)
+        def highest_version(deferredList):
+            max_version = 0
+            metadata = None
+            node = None
+            for success, result in deferredList:
+                if success:
+                    if result[1]['version'] > max_version:
+                        node, metadata = result
+                        max_version = result[1]['version']
+            return node, metadata
+        collective_dirmap_d.addCallback(highest_version)
+        return collective_dirmap_d
 
-    def finish(self, for_tests=False):
-        self._stopped = True
-        self._notifier.stopReading()
-        self._stats_provider.count('magic_folder.dirs_monitored', -1)
+    def _scan_remote(self, nickname, dirnode):
+        listing_d = dirnode.list()
+        self._download_scan_batch = {}
+        def scan_listing(listing_map):
+            for name in listing_map.keys():
+                file_node, metadata = listing_map[name]
+                if self._download_scan_batch.has_key(name):
+                    self._download_scan_batch[name] += [(file_node, metadata)]
+                else:
+                    self._download_scan_batch[name] = [(file_node, metadata)]
+        listing_d.addCallback(scan_listing)
+        return listing_d
 
-        if for_tests and hasattr(self._notifier, 'wait_until_stopped'):
-            d = self._notifier.wait_until_stopped()
-        else:
+    def _scan_remote_collective(self):
+        if self._collective_dirnode is None:
+            return
+        upload_readonly_dircap = self._upload_dirnode.get_readonly_uri()
+        collective_dirmap_d = self._collective_dirnode.list()
+        def do_filter(result):
+            others = [x for x in result.keys() if result[x][0].get_readonly_uri() != upload_readonly_dircap]
+            return result, others
+        collective_dirmap_d.addCallback(do_filter)
+        def scan_collective(result):
             d = defer.succeed(None)
+            collective_dirmap, others_list = result
+            for dir_name in others_list:
+                d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
+                # XXX todo add errback
+            return d
+        collective_dirmap_d.addCallback(scan_collective)
+        collective_dirmap_d.addCallback(self._filter_scan_batch)
+        collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
+        return collective_dirmap_d
 
-        d.addCallback(lambda x: self._download_lazy_tail)
+    def _add_batch_to_download_queue(self, result):
+        self._deque.extend(result)
+        self._pending.update(map(lambda x: x[0], result))
+
+    def _filter_scan_batch(self, result):
+        extension = [] # consider whether this should be a dict
+        for name in self._download_scan_batch.keys():
+            if name in self._pending:
+                continue
+            file_node, metadata = max(self._download_scan_batch[name], key=lambda x: x[1]['version'])
+            if self._should_download(name, metadata['version']):
+                extension += [(name, file_node, metadata)]
+        return extension
+
+    def _download_file(self, name, file_node):
+        d = file_node.download_best_version()
+        def succeeded(res):
+            d.addCallback(lambda result: self._write_downloaded_file(name, result))
+            self._counter('magic_folder.objects_downloaded', 1)
+            return None
+        def failed(f):
+            self._log("download failed")
+            self._counter('magic_folder.objects_download_failed', 1)
+            return f
+        def remove_from_pending(result):
+            self._pending = self._pending.difference(set([name]))
+        d.addCallbacks(succeeded, failed)
+        d.addBoth(self._do_callback)
+        d.addBoth(remove_from_pending)
         return d
 
-    def remove_service(self):
-        return service.MultiService.disownServiceParent(self)
+    def _write_downloaded_file(self, name, file_contents):
+        print "_write_downloaded_file: no-op."
 
-    def _log(self, msg):
-        self._client.log("drop-upload: " + msg)
-        #print "_log %s" % (msg,)
-        #open("events", "ab+").write(msg)
+    # FIXME move to QueueMixin
+    def _append_to_deque(self, path):
+        if path in self._download_scan_batch.keys():
+            return
+        self._deque.append(path)
+        self._pending.add(path)
+        self._counter('magic_folder.download_objects_queued', 1)
+        if self.is_ready:
+            reactor.callLater(0, self._turn_deque)
+
+    # FIXME move to QueueMixin
+    def _turn_deque(self):
+        if self._stopped:
+            return
+        try:
+            file_path, file_node, metadata = self._deque.pop()
+        except IndexError:
+            self._log("magic folder upload deque is now empty")
+            self._lazy_tail = defer.succeed(None)
+            self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._scan_remote_collective))
+            self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._turn_deque))
+            return
+        self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._download_file, file_path, file_node))
+        self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._turn_deque))