From 3180471ce823321b6547c55087ee79b195a69ff4 Mon Sep 17 00:00:00 2001
From: Daira Hopwood <daira@jacaranda.org>
Date: Thu, 1 Oct 2015 22:40:10 +0100
Subject: [PATCH] WIP.

Signed-off-by: Daira Hopwood <daira@jacaranda.org>
---
 .../magic-folder/remote-to-local-sync.rst     |  23 +-
 src/allmydata/backupdb.py                     |  63 +-
 src/allmydata/client.py                       |   3 +-
 src/allmydata/frontends/magic_folder.py       | 639 +++++++++++++++---
 src/allmydata/magicpath.py                    |  27 +
 src/allmydata/scripts/common.py               |  14 +-
 src/allmydata/scripts/magic_folder_cli.py     | 182 +++++
 src/allmydata/scripts/runner.py               |   6 +-
 src/allmydata/scripts/tahoe_ls.py             |   2 -
 src/allmydata/test/no_network.py              |  75 +-
 src/allmydata/test/test_cli.py                |   5 +-
 src/allmydata/test/test_cli_backup.py         |   3 +-
 src/allmydata/test/test_cli_magic_folder.py   | 206 ++++++
 src/allmydata/test/test_client.py             |  22 +-
 src/allmydata/test/test_magic_folder.py       | 466 +++++++++++--
 src/allmydata/test/test_magicpath.py          |  28 +
 src/allmydata/test/test_util.py               | 112 +++
 src/allmydata/util/deferredutil.py            |  39 +-
 src/allmydata/util/encodingutil.py            |  50 +-
 src/allmydata/util/fileutil.py                | 100 ++-
 20 files changed, 1798 insertions(+), 267 deletions(-)
 create mode 100644 src/allmydata/magicpath.py
 create mode 100644 src/allmydata/scripts/magic_folder_cli.py
 create mode 100644 src/allmydata/test/test_cli_magic_folder.py
 create mode 100644 src/allmydata/test/test_magicpath.py

diff --git a/docs/proposed/magic-folder/remote-to-local-sync.rst b/docs/proposed/magic-folder/remote-to-local-sync.rst
index 3e05b6cb..d134fb9f 100644
--- a/docs/proposed/magic-folder/remote-to-local-sync.rst
+++ b/docs/proposed/magic-folder/remote-to-local-sync.rst
@@ -396,13 +396,16 @@ and Windows. On Unix, it can be implemented as follows:
 
 Note that, if there is no conflict, the entry for ``foo``
 recorded in the `magic folder db`_ will reflect the ``mtime``
-set in step 3. The link operation in step 4c will cause an
-``IN_CREATE`` event for ``foo``, but this will not trigger an
-upload, because the metadata recorded in the database entry
-will exactly match the metadata for the file's inode on disk.
-(The two hard links — ``foo`` and, while it still exists,
-``.foo.tmp`` — share the same inode and therefore the same
-metadata.)
+set in step 3. The move operation in step 4b will cause a
+``MOVED_FROM`` event for ``foo``, and the link operation in
+step 4c will cause an ``IN_CREATE`` event for ``foo``.
+However, these events will not trigger an upload, because they
+are guaranteed to be processed only after the file replacement
+has finished, at which point the metadata recorded in the
+database entry will exactly match the metadata for the file's
+inode on disk. (The two hard links — ``foo`` and,  while it
+still exists, ``.foo.tmp`` — share the same inode and
+therefore the same metadata.)
 
 .. _`magic folder db`: filesystem_integration.rst#local-scanning-and-database
 
@@ -411,9 +414,9 @@ call to the `ReplaceFileW`_ API (with the
 ``REPLACEFILE_IGNORE_MERGE_ERRORS`` flag).
 
 Similar to the Unix case, the `ReplaceFileW`_ operation will
-cause a change notification for ``foo``. The replaced ``foo``
-has the same ``mtime`` as the replacement file, and so this
-notification will not trigger an unwanted upload.
+cause one or more change notifications for ``foo``. The replaced
+``foo`` has the same ``mtime`` as the replacement file, and so any
+such notification(s) will not trigger an unwanted upload.
 
 .. _`ReplaceFileW`: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365512%28v=vs.85%29.aspx
 
diff --git a/src/allmydata/backupdb.py b/src/allmydata/backupdb.py
index a51356ca..8fecb854 100644
--- a/src/allmydata/backupdb.py
+++ b/src/allmydata/backupdb.py
@@ -21,7 +21,8 @@ CREATE TABLE version
 CREATE TABLE local_files
 (
  path  VARCHAR(1024) PRIMARY KEY, -- index, this is an absolute UTF-8-encoded local filename
- size  INTEGER,       -- os.stat(fn)[stat.ST_SIZE]
+ -- note that size is before mtime and ctime here, but after in function parameters
+ size  INTEGER,       -- os.stat(fn)[stat.ST_SIZE]   (NULL if the file has been deleted)
  mtime NUMBER,        -- os.stat(fn)[stat.ST_MTIME]
  ctime NUMBER,        -- os.stat(fn)[stat.ST_CTIME]
  fileid INTEGER%s
@@ -186,9 +187,9 @@ class BackupDB:
         is not healthy, please upload the file and call r.did_upload(filecap)
         when you're done.
 
-        If use_timestamps=True (the default), I will compare ctime and mtime
+        If use_timestamps=True (the default), I will compare mtime and ctime
         of the local file against an entry in my database, and consider the
-        file to be unchanged if ctime, mtime, and filesize are all the same
+        file to be unchanged if mtime, ctime, and filesize are all the same
         as the earlier version. If use_timestamps=False, I will not trust the
         timestamps, so more files (perhaps all) will be marked as needing
         upload. A future version of this database may hash the file to make
@@ -200,10 +201,12 @@ class BackupDB:
         """
 
         path = abspath_expanduser_unicode(path)
+
+        # XXX consider using get_pathinfo
         s = os.stat(path)
         size = s[stat.ST_SIZE]
-        ctime = s[stat.ST_CTIME]
         mtime = s[stat.ST_MTIME]
+        ctime = s[stat.ST_CTIME]
 
         now = time.time()
         c = self.cursor
@@ -368,34 +371,33 @@ class BackupDB:
 class MagicFolderDB(BackupDB):
     VERSION = 3
 
-    def get_all_files(self):
-        """Retreive a list of all files that have had an entry in magic-folder db
-        (files that have been downloaded at least once).
+    def get_all_relpaths(self):
+        """
+        Retrieve a list of all relpaths of files that have had an entry in magic folder db
+        (i.e. that have been downloaded at least once).
         """
         self.cursor.execute("SELECT path FROM local_files")
         rows = self.cursor.fetchall()
-        if not rows:
-            return None
-        else:
-            return rows
+        return set([r[0] for r in rows])
 
-    def get_local_file_version(self, path):
-        """I will tell you the version of a local file tracked by our magic folder db.
-        If no db entry found then I'll return None.
+    def get_local_file_version(self, relpath_u):
+        """
+        Return the version of a local file tracked by our magic folder db.
+        If no db entry is found then return None.
         """
         c = self.cursor
         c.execute("SELECT version, fileid"
                   " FROM local_files"
                   " WHERE path=?",
-                  (path,))
+                  (relpath_u,))
         row = self.cursor.fetchone()
         if not row:
             return None
         else:
             return row[0]
 
-    def did_upload_file(self, filecap, path, version, mtime, ctime, size):
-        #print "_did_upload_file(%r, %r, %r, %r, %r, %r)" % (filecap, path, version, mtime, ctime, size)
+    def did_upload_version(self, filecap, relpath_u, version, pathinfo):
+        #print "did_upload_version(%r, %r, %r, %r)" % (filecap, relpath_u, version, pathinfo)
         now = time.time()
         fileid = self.get_or_allocate_fileid_for_cap(filecap)
         try:
@@ -408,35 +410,26 @@ class MagicFolderDB(BackupDB):
                                 (now, now, fileid))
         try:
             self.cursor.execute("INSERT INTO local_files VALUES (?,?,?,?,?,?)",
-                                (path, size, mtime, ctime, fileid, version))
+                                (relpath_u, pathinfo.size, pathinfo.mtime, pathinfo.ctime, fileid, version))
         except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
             self.cursor.execute("UPDATE local_files"
                                 " SET size=?, mtime=?, ctime=?, fileid=?, version=?"
                                 " WHERE path=?",
-                                (size, mtime, ctime, fileid, version, path))
+                                (pathinfo.size, pathinfo.mtime, pathinfo.ctime, fileid, version, relpath_u))
         self.connection.commit()
 
-    def is_new_file_time(self, path, relpath_u):
-        """recent_file_time returns true if the file is recent...
-        meaning its current statinfo (i.e. size, ctime, and mtime) matched the statinfo
-        that was previously stored in the db.
+    def is_new_file(self, pathinfo, relpath_u):
         """
-        #print "check_file_time %s %s" % (path, relpath_u)
-        path = abspath_expanduser_unicode(path)
-        s = os.stat(path)
-        size = s[stat.ST_SIZE]
-        ctime = s[stat.ST_CTIME]
-        mtime = s[stat.ST_MTIME]
+        Returns true if the file's current pathinfo (size, mtime, and ctime) has
+        changed from the pathinfo previously stored in the db.
+        """
+        #print "is_new_file(%r, %r)" % (pathinfo, relpath_u)
         c = self.cursor
-        c.execute("SELECT size,mtime,ctime,fileid"
+        c.execute("SELECT size, mtime, ctime"
                   " FROM local_files"
                   " WHERE path=?",
                   (relpath_u,))
         row = self.cursor.fetchone()
         if not row:
             return True
-        (last_size,last_mtime,last_ctime,last_fileid) = row
-        if (size, ctime, mtime) == (last_size, last_ctime, last_mtime):
-            return False
-        else:
-            return True
+        return (pathinfo.size, pathinfo.mtime, pathinfo.ctime) != row
diff --git a/src/allmydata/client.py b/src/allmydata/client.py
index d23b8cfd..63b87288 100644
--- a/src/allmydata/client.py
+++ b/src/allmydata/client.py
@@ -129,6 +129,7 @@ class Client(node.Node, pollmixin.PollMixin):
                                    }
 
     def __init__(self, basedir="."):
+        #print "Client.__init__(%r)" % (basedir,)
         node.Node.__init__(self, basedir)
         self.connected_enough_d = defer.Deferred()
         self.started_timestamp = time.time()
@@ -511,7 +512,7 @@ class Client(node.Node, pollmixin.PollMixin):
 
             from allmydata.frontends import magic_folder
 
-            s = magic_folder.MagicFolder(self, upload_dircap, local_dir, dbfile)
+            s = magic_folder.MagicFolder(self, upload_dircap, collective_dircap, local_dir, dbfile)
             s.setServiceParent(self)
             s.startService()
 
diff --git a/src/allmydata/frontends/magic_folder.py b/src/allmydata/frontends/magic_folder.py
index ee820b87..422a7c11 100644
--- a/src/allmydata/frontends/magic_folder.py
+++ b/src/allmydata/frontends/magic_folder.py
@@ -1,149 +1,596 @@
 
-import sys
+import sys, os
+import os.path
+from collections import deque
+import time
 
-from twisted.internet import defer
-from twisted.python.filepath import FilePath
+from twisted.internet import defer, reactor, task
+from twisted.python.failure import Failure
+from twisted.python import runtime
 from twisted.application import service
-from foolscap.api import eventually
 
+from allmydata.util import fileutil
 from allmydata.interfaces import IDirectoryNode
+from allmydata.util import log
+from allmydata.util.fileutil import precondition_abspath, get_pathinfo
+from allmydata.util.assertutil import precondition
+from allmydata.util.deferredutil import HookMixin
+from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
+     extend_filepath, unicode_from_filepath, unicode_segments_from, \
+     quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
+from allmydata.immutable.upload import FileName, Data
+from allmydata import backupdb, magicpath
 
-from allmydata.util.fileutil import abspath_expanduser_unicode, precondition_abspath
-from allmydata.util.encodingutil import listdir_unicode, to_filepath, \
-     unicode_from_filepath, quote_local_unicode_path, FilenameEncodingError
-from allmydata.immutable.upload import FileName
-from allmydata import backupdb
 
+IN_EXCL_UNLINK = 0x04000000L
+
+def get_inotify_module():
+    try:
+        if sys.platform == "win32":
+            from allmydata.windows import inotify
+        elif runtime.platform.supportsINotify():
+            from twisted.internet import inotify
+        else:
+            raise NotImplementedError("filesystem notification needed for drop-upload is not supported.\n"
+                                      "This currently requires Linux or Windows.")
+        return inotify
+    except (ImportError, AttributeError) as e:
+        log.msg(e)
+        if sys.platform == "win32":
+            raise NotImplementedError("filesystem notification needed for drop-upload is not supported.\n"
+                                      "Windows support requires at least Vista, and has only been tested on Windows 7.")
+        raise
 
 
 class MagicFolder(service.MultiService):
     name = 'magic-folder'
 
-    def __init__(self, client, upload_dircap, local_dir, dbfile, inotify=None,
-                 pending_delay=1.0):
-        precondition_abspath(local_dir)
+    def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
+                 pending_delay=1.0, clock=reactor):
+        precondition_abspath(local_path_u)
 
         service.MultiService.__init__(self)
-        self._local_dir = abspath_expanduser_unicode(local_dir)
+
+        db = backupdb.get_backupdb(dbfile, create_version=(backupdb.SCHEMA_v3, 3))
+        if db is None:
+            return Failure(Exception('ERROR: Unable to load magic folder db.'))
+
+        # for tests
         self._client = client
-        self._stats_provider = client.stats_provider
-        self._convergence = client.convergence
-        self._local_path = to_filepath(self._local_dir)
-        self._dbfile = dbfile
+        self._db = db
 
-        self.is_upload_ready = False
+        self.is_ready = False
 
-        if inotify is None:
-            if sys.platform == "win32":
-                from allmydata.windows import inotify
-            else:
-                from twisted.internet import inotify
-        self._inotify = inotify
+        self.uploader = Uploader(client, local_path_u, db, upload_dircap, pending_delay, clock)
+        self.downloader = Downloader(client, local_path_u, db, collective_dircap, clock)
+
+    def startService(self):
+        # TODO: why is this being called more than once?
+        if self.running:
+            return defer.succeed(None)
+        print "%r.startService" % (self,)
+        service.MultiService.startService(self)
+        return self.uploader.start_monitoring()
+
+    def ready(self):
+        """ready is used to signal us to start
+        processing the upload and download items...
+        """
+        self.is_ready = True
+        d = self.uploader.start_scanning()
+        d2 = self.downloader.start_scanning()
+        d.addCallback(lambda ign: d2)
+        return d
 
-        if not self._local_path.exists():
+    def finish(self):
+        print "finish"
+        d = self.uploader.stop()
+        d2 = self.downloader.stop()
+        d.addCallback(lambda ign: d2)
+        return d
+
+    def remove_service(self):
+        return service.MultiService.disownServiceParent(self)
+
+
+class QueueMixin(HookMixin):
+    def __init__(self, client, local_path_u, db, name, clock):
+        self._client = client
+        self._local_path_u = local_path_u
+        self._local_filepath = to_filepath(local_path_u)
+        self._db = db
+        self._name = name
+        self._clock = clock
+        self._hooks = {'processed': None, 'started': None}
+        self.started_d = self.set_hook('started')
+
+        if not self._local_filepath.exists():
             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
                                  "but there is no directory at that location."
-                                 % quote_local_unicode_path(local_dir))
-        if not self._local_path.isdir():
+                                 % quote_local_unicode_path(self._local_path_u))
+        if not self._local_filepath.isdir():
             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
                                  "but the thing at that location is not a directory."
-                                 % quote_local_unicode_path(local_dir))
+                                 % quote_local_unicode_path(self._local_path_u))
+
+        self._deque = deque()
+        self._lazy_tail = defer.succeed(None)
+        self._pending = set()
+        self._stopped = False
+        self._turn_delay = 0
+
+    def _get_filepath(self, relpath_u):
+        return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
+
+    def _get_relpath(self, filepath):
+        print "_get_relpath(%r)" % (filepath,)
+        segments = unicode_segments_from(filepath, self._local_filepath)
+        print "segments = %r" % (segments,)
+        return u"/".join(segments)
+
+    def _count(self, counter_name, delta=1):
+        ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
+        print "%r += %r" % (ctr, delta)
+        self._client.stats_provider.count(ctr, delta)
+
+    def _log(self, msg):
+        s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
+        self._client.log(s)
+        print s
+        #open("events", "ab+").write(msg)
+
+    def _append_to_deque(self, relpath_u):
+        print "_append_to_deque(%r)" % (relpath_u,)
+        if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
+            return
+        self._deque.append(relpath_u)
+        self._pending.add(relpath_u)
+        self._count('objects_queued')
+        if self.is_ready:
+            self._clock.callLater(0, self._turn_deque)
+
+    def _turn_deque(self):
+        if self._stopped:
+            return
+        try:
+            item = self._deque.pop()
+            self._count('objects_queued', -1)
+        except IndexError:
+            self._log("deque is now empty")
+            self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
+        else:
+            self._lazy_tail.addCallback(lambda ign: self._process(item))
+            self._lazy_tail.addBoth(self._call_hook, 'processed')
+            self._lazy_tail.addErrback(log.err)
+            self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
+
+
+class Uploader(QueueMixin):
+    def __init__(self, client, local_path_u, db, upload_dircap, pending_delay, clock):
+        QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
+
+        self.is_ready = False
 
         # TODO: allow a path rather than a cap URI.
-        self._parent = self._client.create_node_from_uri(upload_dircap)
-        if not IDirectoryNode.providedBy(self._parent):
+        self._upload_dirnode = self._client.create_node_from_uri(upload_dircap)
+        if not IDirectoryNode.providedBy(self._upload_dirnode):
             raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.")
-        if self._parent.is_unknown() or self._parent.is_readonly():
+        if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly():
             raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.")
 
-        self._uploaded_callback = lambda ign: None
+        self._inotify = get_inotify_module()
+        self._notifier = self._inotify.INotify()
 
-        self._notifier = inotify.INotify()
         if hasattr(self._notifier, 'set_pending_delay'):
             self._notifier.set_pending_delay(pending_delay)
 
         # We don't watch for IN_CREATE, because that would cause us to read and upload a
         # possibly-incomplete file before the application has closed it. There should always
         # be an IN_CLOSE_WRITE after an IN_CREATE (I think).
-        # TODO: what about IN_MOVE_SELF or IN_UNMOUNT?
-        mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO | inotify.IN_ONLYDIR
-        self._notifier.watch(self._local_path, mask=mask, callbacks=[self._notify])
-
-    def _check_db_file(self, childpath):
-        # returns True if the file must be uploaded.
-        assert self._db != None
-        r = self._db.check_file(childpath)
-        filecap = r.was_uploaded()
-        if filecap is False:
-            return True
+        # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
+        #
+        self.mask = ( self._inotify.IN_CLOSE_WRITE
+                    | self._inotify.IN_MOVED_TO
+                    | self._inotify.IN_MOVED_FROM
+                    | self._inotify.IN_DELETE
+                    | self._inotify.IN_ONLYDIR
+                    | IN_EXCL_UNLINK
+                    )
+        self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
+                             recursive=True)
 
-    def startService(self):
-        self._db = backupdb.get_backupdb(self._dbfile)
-        if self._db is None:
-            return Failure(Exception('ERROR: Unable to load magic folder db.'))
+    def start_monitoring(self):
+        self._log("start_monitoring")
+        d = defer.succeed(None)
+        d.addCallback(lambda ign: self._notifier.startReading())
+        d.addCallback(lambda ign: self._count('dirs_monitored'))
+        d.addBoth(self._call_hook, 'started')
+        return d
 
-        service.MultiService.startService(self)
-        d = self._notifier.startReading()
-        self._stats_provider.count('drop_upload.dirs_monitored', 1)
+    def stop(self):
+        self._log("stop")
+        self._notifier.stopReading()
+        self._count('dirs_monitored', -1)
+        if hasattr(self._notifier, 'wait_until_stopped'):
+            d = self._notifier.wait_until_stopped()
+        else:
+            d = defer.succeed(None)
+        d.addCallback(lambda ign: self._lazy_tail)
         return d
 
-    def upload_ready(self):
-        """upload_ready is used to signal us to start
-        processing the upload items...
-        """
-        self.is_upload_ready = True
+    def start_scanning(self):
+        self._log("start_scanning")
+        self.is_ready = True
+        self._pending = self._db.get_all_relpaths()
+        print "all_files %r" % (self._pending)
+        d = self._scan(u"")
+        def _add_pending(ign):
+            # This adds all of the files that were in the db but not already processed
+            # (normally because they have been deleted on disk).
+            print "adding %r" % (self._pending)
+            self._deque.extend(self._pending)
+        d.addCallback(_add_pending)
+        d.addCallback(lambda ign: self._turn_deque())
+        return d
+
+    def _scan(self, reldir_u):
+        self._log("scan %r" % (reldir_u,))
+        fp = self._get_filepath(reldir_u)
+        try:
+            children = listdir_filepath(fp)
+        except EnvironmentError:
+            raise Exception("WARNING: magic folder: permission denied on directory %s"
+                            % quote_filepath(fp))
+        except FilenameEncodingError:
+            raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
+                            % quote_filepath(fp))
+
+        d = defer.succeed(None)
+        for child in children:
+            assert isinstance(child, unicode), child
+            d.addCallback(lambda ign, child=child:
+                          ("%s/%s" % (reldir_u, child) if reldir_u else child))
+            def _add_pending(relpath_u):
+                if magicpath.should_ignore_file(relpath_u):
+                    return None
+
+                self._pending.add(relpath_u)
+                return relpath_u
+            d.addCallback(_add_pending)
+            # This call to _process doesn't go through the deque, and probably should.
+            d.addCallback(self._process)
+            d.addBoth(self._call_hook, 'processed')
+            d.addErrback(log.err)
+
+        return d
 
     def _notify(self, opaque, path, events_mask):
         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
+        relpath_u = self._get_relpath(path)
+        self._append_to_deque(relpath_u)
+
+    def _when_queue_is_empty(self):
+        return defer.succeed(None)
 
-        self._stats_provider.count('drop_upload.files_queued', 1)
-        eventually(self._process, opaque, path, events_mask)
+    def _process(self, relpath_u):
+        self._log("_process(%r)" % (relpath_u,))
+        if relpath_u is None:
+            return
+        precondition(isinstance(relpath_u, unicode), relpath_u)
 
-    def _process(self, opaque, path, events_mask):
         d = defer.succeed(None)
 
-        # FIXME: if this already exists as a mutable file, we replace the directory entry,
-        # but we should probably modify the file (as the SFTP frontend does).
-        def _add_file(ign):
-            name = path.basename()
-            # on Windows the name is already Unicode
-            if not isinstance(name, unicode):
-                name = name.decode(get_filesystem_encoding())
-
-            u = FileName(path.path, self._convergence)
-            return self._parent.add_file(name, u)
-        d.addCallback(_add_file)
-
-        def _succeeded(ign):
-            self._stats_provider.count('drop_upload.files_queued', -1)
-            self._stats_provider.count('drop_upload.files_uploaded', 1)
-        def _failed(f):
-            self._stats_provider.count('drop_upload.files_queued', -1)
-            if path.exists():
-                self._log("drop-upload: %r failed to upload due to %r" % (path.path, f))
-                self._stats_provider.count('drop_upload.files_failed', 1)
-                return f
+        def _maybe_upload(val):
+            fp = self._get_filepath(relpath_u)
+            pathinfo = get_pathinfo(unicode_from_filepath(fp))
+
+            print "pending = %r, about to remove %r" % (self._pending, relpath_u)
+            self._pending.remove(relpath_u)
+            encoded_path_u = magicpath.path2magic(relpath_u)
+
+            if not pathinfo.exists:
+                self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
+                self._count('objects_disappeared')
+                d2 = defer.succeed(None)
+                if self._db.check_file_db_exists(relpath_u):
+                    d2.addCallback(lambda ign: self._get_metadata(encoded_path_u))
+                    current_version = self._db.get_local_file_version(relpath_u) + 1
+                    def set_deleted(metadata):
+                        metadata['version'] = current_version
+                        metadata['deleted'] = True
+                        empty_uploadable = Data("", self._client.convergence)
+                        return self._upload_dirnode.add_file(encoded_path_u, empty_uploadable, overwrite=True, metadata=metadata)
+                    d2.addCallback(set_deleted)
+                    def add_db_entry(filenode):
+                        filecap = filenode.get_uri()
+                        self._db.did_upload_version(filecap, relpath_u, current_version, pathinfo)
+                        self._count('files_uploaded')
+                    # FIXME consider whether it's correct to retrieve the filenode again.
+                    d2.addCallback(lambda x: self._get_filenode(encoded_path_u))
+                    d2.addCallback(add_db_entry)
+
+                d2.addCallback(lambda x: Exception("file does not exist"))  # FIXME wrong
+                return d2
+            elif pathinfo.islink:
+                self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
+                return None
+            elif pathinfo.isdir:
+                self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
+                uploadable = Data("", self._client.convergence)
+                encoded_path_u += magicpath.path2magic(u"/")
+                upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
+                def _succeeded(ign):
+                    self._log("created subdirectory %r" % (relpath_u,))
+                    self._count('directories_created')
+                def _failed(f):
+                    self._log("failed to create subdirectory %r" % (relpath_u,))
+                    return f
+                upload_d.addCallbacks(_succeeded, _failed)
+                upload_d.addCallback(lambda ign: self._scan(relpath_u))
+                return upload_d
+            elif pathinfo.isfile:
+                version = self._db.get_local_file_version(relpath_u)
+                if version is None:
+                    version = 0
+                elif self._db.is_new_file(pathinfo, relpath_u):
+                    version += 1
+                else:
+                    return None
+
+                uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
+                d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":version}, overwrite=True)
+                def add_db_entry(filenode):
+                    filecap = filenode.get_uri()
+                    self._db.did_upload_version(filecap, relpath_u, version, pathinfo)
+                    self._count('files_uploaded')
+                d2.addCallback(add_db_entry)
+                return d2
             else:
-                self._log("drop-upload: notified file %r disappeared "
-                          "(this is normal for temporary files): %r" % (path.path, f))
-                self._stats_provider.count('drop_upload.files_disappeared', 1)
+                self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
                 return None
+
+        d.addCallback(_maybe_upload)
+
+        def _succeeded(res):
+            self._count('objects_succeeded')
+            return res
+        def _failed(f):
+            print f
+            self._count('objects_failed')
+            self._log("%r while processing %r" % (f, relpath_u))
+            return f
         d.addCallbacks(_succeeded, _failed)
-        d.addBoth(self._uploaded_callback)
         return d
 
-    def set_uploaded_callback(self, callback):
-        """This sets a function that will be called after a file has been uploaded."""
-        self._uploaded_callback = callback
+    def _get_metadata(self, encoded_path_u):
+        try:
+            d = self._upload_dirnode.get_metadata_for(encoded_path_u)
+        except KeyError:
+            return Failure()
+        return d
 
-    def finish(self, for_tests=False):
-        self._notifier.stopReading()
-        self._stats_provider.count('drop_upload.dirs_monitored', -1)
-        if for_tests and hasattr(self._notifier, 'wait_until_stopped'):
-            return self._notifier.wait_until_stopped()
+    def _get_filenode(self, encoded_path_u):
+        try:
+            d = self._upload_dirnode.get(encoded_path_u)
+        except KeyError:
+            return Failure()
+        return d
+
+
+class Downloader(QueueMixin):
+    def __init__(self, client, local_path_u, db, collective_dircap, clock):
+        QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
+
+        # TODO: allow a path rather than a cap URI.
+        self._collective_dirnode = self._client.create_node_from_uri(collective_dircap)
+
+        if not IDirectoryNode.providedBy(self._collective_dirnode):
+            raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.")
+        if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
+            raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
+
+        self._turn_delay = 3 # delay between remote scans
+        self._download_scan_batch = {} # path -> [(filenode, metadata)]
+
+    def start_scanning(self):
+        self._log("\nstart_scanning")
+        files = self._db.get_all_relpaths()
+        self._log("all files %s" % files)
+
+        d = self._scan_remote_collective()
+        self._turn_deque()
+        return d
+
+    def stop(self):
+        self._stopped = True
+        d = defer.succeed(None)
+        d.addCallback(lambda ign: self._lazy_tail)
+        return d
+
+    def _should_download(self, relpath_u, remote_version):
+        """
+        _should_download returns a bool indicating whether or not a remote object should be downloaded.
+        We check the remote metadata version against our magic-folder db version number;
+        latest version wins.
+        """
+        if magicpath.should_ignore_file(relpath_u):
+            return False
+        v = self._db.get_local_file_version(relpath_u)
+        return (v is None or v < remote_version)
+
+    def _get_local_latest(self, relpath_u):
+        """
+        _get_local_latest takes a unicode path string checks to see if this file object
+        exists in our magic-folder db; if not then return None
+        else check for an entry in our magic-folder db and return the version number.
+        """
+        if not self._get_filepath(relpath_u).exists():
+            return None
+        return self._db.get_local_file_version(relpath_u)
+
+    def _get_collective_latest_file(self, filename):
+        """
+        _get_collective_latest_file takes a file path pointing to a file managed by
+        magic-folder and returns a deferred that fires with the two tuple containing a
+        file node and metadata for the latest version of the file located in the
+        magic-folder collective directory.
+        """
+        collective_dirmap_d = self._collective_dirnode.list()
+        def scan_collective(result):
+            list_of_deferreds = []
+            for dir_name in result.keys():
+                # XXX make sure it's a directory
+                d = defer.succeed(None)
+                d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
+                list_of_deferreds.append(d)
+            deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
+            return deferList
+        collective_dirmap_d.addCallback(scan_collective)
+        def highest_version(deferredList):
+            max_version = 0
+            metadata = None
+            node = None
+            for success, result in deferredList:
+                if success:
+                    if result[1]['version'] > max_version:
+                        node, metadata = result
+                        max_version = result[1]['version']
+            return node, metadata
+        collective_dirmap_d.addCallback(highest_version)
+        return collective_dirmap_d
+
+    def _append_to_batch(self, name, file_node, metadata):
+        if self._download_scan_batch.has_key(name):
+            self._download_scan_batch[name] += [(file_node, metadata)]
         else:
-            return defer.succeed(None)
+            self._download_scan_batch[name] = [(file_node, metadata)]
 
-    def _log(self, msg):
-        self._client.log(msg)
-        #open("events", "ab+").write(msg)
+    def _scan_remote(self, nickname, dirnode):
+        self._log("_scan_remote nickname %r" % (nickname,))
+        d = dirnode.list()
+        def scan_listing(listing_map):
+            for name in listing_map.keys():
+                file_node, metadata = listing_map[name]
+                local_version = self._get_local_latest(name)
+                remote_version = metadata.get('version', None)
+                self._log("%r has local version %r, remote version %r" % (name, local_version, remote_version))
+                if local_version is None or remote_version is None or local_version < remote_version:
+                    self._log("added to download queue\n")
+                    self._append_to_batch(name, file_node, metadata)
+        d.addCallback(scan_listing)
+        return d
+
+    def _scan_remote_collective(self):
+        self._log("_scan_remote_collective")
+        self._download_scan_batch = {} # XXX
+
+        if self._collective_dirnode is None:
+            return
+        collective_dirmap_d = self._collective_dirnode.list()
+        def do_list(result):
+            others = [x for x in result.keys()]
+            return result, others
+        collective_dirmap_d.addCallback(do_list)
+        def scan_collective(result):
+            d = defer.succeed(None)
+            collective_dirmap, others_list = result
+            for dir_name in others_list:
+                d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
+                # XXX todo add errback
+            return d
+        collective_dirmap_d.addCallback(scan_collective)
+        collective_dirmap_d.addCallback(self._filter_scan_batch)
+        collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
+        return collective_dirmap_d
+
+    def _add_batch_to_download_queue(self, result):
+        print "result = %r" % (result,)
+        print "deque = %r" % (self._deque,)
+        self._deque.extend(result)
+        print "deque after = %r" % (self._deque,)
+        self._count('objects_queued', len(result))
+        print "pending = %r" % (self._pending,)
+        self._pending.update(map(lambda x: x[0], result))
+        print "pending after = %r" % (self._pending,)
+
+    def _filter_scan_batch(self, result):
+        extension = [] # consider whether this should be a dict
+        for relpath_u in self._download_scan_batch.keys():
+            if relpath_u in self._pending:
+                continue
+            file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
+            if self._should_download(relpath_u, metadata['version']):
+                extension += [(relpath_u, file_node, metadata)]
+        return extension
+
+    def _when_queue_is_empty(self):
+        d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
+        d.addCallback(lambda ign: self._turn_deque())
+        return d
+
+    def _process(self, item):
+        (relpath_u, file_node, metadata) = item
+        d = file_node.download_best_version()
+        def succeeded(res):
+            fp = self._get_filepath(relpath_u)
+            abspath_u = unicode_from_filepath(fp)
+            d2 = defer.succeed(res)
+            d2.addCallback(lambda result: self._write_downloaded_file(abspath_u, result, is_conflict=False))
+            def do_update_db(written_abspath_u):
+                filecap = file_node.get_uri()
+                written_pathinfo = get_pathinfo(written_abspath_u)
+                if not written_pathinfo.exists:
+                    raise Exception("downloaded file %s disappeared" % quote_local_unicode_path(written_abspath_u))
+
+                self._db.did_upload_version(filecap, relpath_u, metadata['version'], written_pathinfo)
+            d2.addCallback(do_update_db)
+            # XXX handle failure here with addErrback...
+            self._count('objects_downloaded')
+            return d2
+        def failed(f):
+            self._log("download failed: %s" % (str(f),))
+            self._count('objects_download_failed')
+            return f
+        d.addCallbacks(succeeded, failed)
+        def remove_from_pending(res):
+            self._pending.remove(relpath_u)
+            return res
+        d.addBoth(remove_from_pending)
+        return d
+
+    FUDGE_SECONDS = 10.0
+
+    @classmethod
+    def _write_downloaded_file(cls, abspath_u, file_contents, is_conflict=False, now=None):
+        # 1. Write a temporary file, say .foo.tmp.
+        # 2. is_conflict determines whether this is an overwrite or a conflict.
+        # 3. Set the mtime of the replacement file to be T seconds before the
+        #    current local time.
+        # 4. Perform a file replacement with backup filename foo.backup,
+        #    replaced file foo, and replacement file .foo.tmp. If any step of
+        #    this operation fails, reclassify as a conflict and stop.
+        #
+        # Returns the path of the destination file.
+
+        precondition_abspath(abspath_u)
+        replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
+        backup_path_u = abspath_u + u".backup"
+        if now is None:
+            now = time.time()
+
+        fileutil.write(replacement_path_u, file_contents)
+        os.utime(replacement_path_u, (now, now - cls.FUDGE_SECONDS))
+        if is_conflict:
+            return cls._rename_conflicted_file(abspath_u, replacement_path_u)
+        else:
+            try:
+                fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
+                return abspath_u
+            except fileutil.ConflictError:
+                return cls._rename_conflicted_file(abspath_u, replacement_path_u)
+
+    @classmethod
+    def _rename_conflicted_file(self, abspath_u, replacement_path_u):
+        conflict_path_u = abspath_u + u".conflict"
+        fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
+        return conflict_path_u
diff --git a/src/allmydata/magicpath.py b/src/allmydata/magicpath.py
new file mode 100644
index 00000000..ba15ed58
--- /dev/null
+++ b/src/allmydata/magicpath.py
@@ -0,0 +1,27 @@
+
+import re
+import os.path
+
+from allmydata.util.assertutil import precondition
+
+def path2magic(path):
+    return re.sub(ur'[/@]',  lambda m: {u'/': u'@_', u'@': u'@@'}[m.group(0)], path)
+
+def magic2path(path):
+    return re.sub(ur'@[_@]', lambda m: {u'@_': u'/', u'@@': u'@'}[m.group(0)], path)
+
+
+IGNORE_SUFFIXES = [u'.backup', u'.tmp', u'.conflicted']
+IGNORE_PREFIXES = [u'.']
+
+def should_ignore_file(path_u):
+    precondition(isinstance(path_u, unicode), path_u=path_u)
+
+    for suffix in IGNORE_SUFFIXES:
+        if path_u.endswith(suffix):
+            return True
+    while path_u != u"":
+        path_u, tail_u = os.path.split(path_u)
+        if tail_u.startswith(u"."):
+            return True
+    return False
diff --git a/src/allmydata/scripts/common.py b/src/allmydata/scripts/common.py
index d6246fc0..3bfe9750 100644
--- a/src/allmydata/scripts/common.py
+++ b/src/allmydata/scripts/common.py
@@ -57,9 +57,14 @@ class BasedirOptions(BaseOptions):
     ]
 
     def parseArgs(self, basedir=None):
-        if self.parent['node-directory'] and self['basedir']:
+        # This finds the node-directory option correctly even if we are in a subcommand.
+        root = self.parent
+        while root.parent is not None:
+            root = root.parent
+
+        if root['node-directory'] and self['basedir']:
             raise usage.UsageError("The --node-directory (or -d) and --basedir (or -C) options cannot both be used.")
-        if self.parent['node-directory'] and basedir:
+        if root['node-directory'] and basedir:
             raise usage.UsageError("The --node-directory (or -d) option and a basedir argument cannot both be used.")
         if self['basedir'] and basedir:
             raise usage.UsageError("The --basedir (or -C) option and a basedir argument cannot both be used.")
@@ -68,13 +73,14 @@ class BasedirOptions(BaseOptions):
             b = argv_to_abspath(basedir)
         elif self['basedir']:
             b = argv_to_abspath(self['basedir'])
-        elif self.parent['node-directory']:
-            b = argv_to_abspath(self.parent['node-directory'])
+        elif root['node-directory']:
+            b = argv_to_abspath(root['node-directory'])
         elif self.default_nodedir:
             b = self.default_nodedir
         else:
             raise usage.UsageError("No default basedir available, you must provide one with --node-directory, --basedir, or a basedir argument")
         self['basedir'] = b
+        self['node-directory'] = b
 
     def postOptions(self):
         if not self['basedir']:
diff --git a/src/allmydata/scripts/magic_folder_cli.py b/src/allmydata/scripts/magic_folder_cli.py
new file mode 100644
index 00000000..1cf20c97
--- /dev/null
+++ b/src/allmydata/scripts/magic_folder_cli.py
@@ -0,0 +1,182 @@
+
+import os
+from cStringIO import StringIO
+from twisted.python import usage
+
+from .common import BaseOptions, BasedirOptions, get_aliases
+from .cli import MakeDirectoryOptions, LnOptions, CreateAliasOptions
+import tahoe_mv
+from allmydata.util import fileutil
+from allmydata import uri
+
+INVITE_SEPARATOR = "+"
+
+class CreateOptions(BasedirOptions):
+    nickname = None
+    localdir = None
+    synopsis = "MAGIC_ALIAS: [NICKNAME LOCALDIR]"
+    def parseArgs(self, alias, nickname=None, localdir=None):
+        BasedirOptions.parseArgs(self)
+        if not alias.endswith(':'):
+            raise usage.UsageError("An alias must end with a ':' character.")
+        self.alias = alias[:-1]
+        self.nickname = nickname
+        self.localdir = localdir
+        if self.nickname and not self.localdir:
+            raise usage.UsageError("If NICKNAME is specified then LOCALDIR must also be specified.")
+        node_url_file = os.path.join(self['node-directory'], "node.url")
+        self['node-url'] = fileutil.read(node_url_file).strip()
+
+def _delegate_options(source_options, target_options):
+    target_options.aliases = get_aliases(source_options['node-directory'])
+    target_options["node-url"] = source_options["node-url"]
+    target_options["node-directory"] = source_options["node-directory"]
+    target_options.stdin = StringIO("")
+    target_options.stdout = StringIO()
+    target_options.stderr = StringIO()
+    return target_options
+
+def create(options):
+    from allmydata.scripts import tahoe_add_alias
+    create_alias_options = _delegate_options(options, CreateAliasOptions())
+    create_alias_options.alias = options.alias
+
+    rc = tahoe_add_alias.create_alias(create_alias_options)
+    if rc != 0:
+        print >>options.stderr, create_alias_options.stderr.getvalue()
+        return rc
+    print >>options.stdout, create_alias_options.stdout.getvalue()
+
+    if options.nickname is not None:
+        invite_options = _delegate_options(options, InviteOptions())
+        invite_options.alias = options.alias
+        invite_options.nickname = options.nickname
+        rc = invite(invite_options)
+        if rc != 0:
+            print >>options.stderr, "magic-folder: failed to invite after create\n"
+            print >>options.stderr, invite_options.stderr.getvalue()
+            return rc
+        invite_code = invite_options.stdout.getvalue().strip()
+
+        join_options = _delegate_options(options, JoinOptions())
+        join_options.invite_code = invite_code
+        fields = invite_code.split(INVITE_SEPARATOR)
+        if len(fields) != 2:
+            raise usage.UsageError("Invalid invite code.")
+        join_options.magic_readonly_cap, join_options.dmd_write_cap = fields
+        join_options.local_dir = options.localdir
+        rc = join(join_options)
+        if rc != 0:
+            print >>options.stderr, "magic-folder: failed to join after create\n"
+            print >>options.stderr, join_options.stderr.getvalue()
+            return rc
+    return 0
+
+class InviteOptions(BasedirOptions):
+    nickname = None
+    synopsis = "MAGIC_ALIAS: NICKNAME"
+    stdin = StringIO("")
+    def parseArgs(self, alias, nickname=None):
+        BasedirOptions.parseArgs(self)
+        if not alias.endswith(':'):
+            raise usage.UsageError("An alias must end with a ':' character.")
+        self.alias = alias[:-1]
+        self.nickname = nickname
+        node_url_file = os.path.join(self['node-directory'], "node.url")
+        self['node-url'] = open(node_url_file, "r").read().strip()
+        aliases = get_aliases(self['node-directory'])
+        self.aliases = aliases
+
+def invite(options):
+    from allmydata.scripts import tahoe_mkdir
+    mkdir_options = _delegate_options(options, MakeDirectoryOptions())
+    mkdir_options.where = None
+
+    rc = tahoe_mkdir.mkdir(mkdir_options)
+    if rc != 0:
+        print >>options.stderr, "magic-folder: failed to mkdir\n"
+        return rc
+    dmd_write_cap = mkdir_options.stdout.getvalue().strip()
+    dmd_readonly_cap = unicode(uri.from_string(dmd_write_cap).get_readonly().to_string(), 'utf-8')
+    if dmd_readonly_cap is None:
+        print >>options.stderr, "magic-folder: failed to diminish dmd write cap\n"
+        return 1
+
+    magic_write_cap = get_aliases(options["node-directory"])[options.alias]
+    magic_readonly_cap = unicode(uri.from_string(magic_write_cap).get_readonly().to_string(), 'utf-8')
+    # tahoe ln CLIENT_READCAP COLLECTIVE_WRITECAP/NICKNAME
+    ln_options = _delegate_options(options, LnOptions())
+    ln_options.from_file = dmd_readonly_cap
+    ln_options.to_file = "%s/%s" % (magic_write_cap, options.nickname)
+    rc = tahoe_mv.mv(ln_options, mode="link")
+    if rc != 0:
+        print >>options.stderr, "magic-folder: failed to create link\n"
+        print >>options.stderr, ln_options.stderr.getvalue()
+        return rc
+
+    print >>options.stdout, "%s%s%s" % (magic_readonly_cap, INVITE_SEPARATOR, dmd_write_cap)
+    return 0
+
+class JoinOptions(BasedirOptions):
+    synopsis = "INVITE_CODE LOCAL_DIR"
+    dmd_write_cap = ""
+    magic_readonly_cap = ""
+    def parseArgs(self, invite_code, local_dir):
+        BasedirOptions.parseArgs(self)
+        self.local_dir = local_dir
+        fields = invite_code.split(INVITE_SEPARATOR)
+        if len(fields) != 2:
+            raise usage.UsageError("Invalid invite code.")
+        self.magic_readonly_cap, self.dmd_write_cap = fields
+
+def join(options):
+    dmd_cap_file = os.path.join(options["node-directory"], "private/magic_folder_dircap")
+    collective_readcap_file = os.path.join(options["node-directory"], "private/collective_dircap")
+
+    fileutil.write(dmd_cap_file, options.dmd_write_cap)
+    fileutil.write(collective_readcap_file, options.magic_readonly_cap)
+    fileutil.write(os.path.join(options["node-directory"], "tahoe.cfg"),
+                   "[magic_folder]\nenabled = True\nlocal.directory = %s\n"
+                   % (options.local_dir.encode('utf-8'),), mode="ab")
+    return 0
+
+class MagicFolderCommand(BaseOptions):
+    subCommands = [
+        ["create", None, CreateOptions, "Create a Magic Folder."],
+        ["invite", None, InviteOptions, "Invite someone to a Magic Folder."],
+        ["join", None, JoinOptions, "Join a Magic Folder."],
+    ]
+    def postOptions(self):
+        if not hasattr(self, 'subOptions'):
+            raise usage.UsageError("must specify a subcommand")
+    def getSynopsis(self):
+        return "Usage: tahoe [global-options] magic SUBCOMMAND"
+    def getUsage(self, width=None):
+        t = BaseOptions.getUsage(self, width)
+        t += """\
+Please run e.g. 'tahoe magic-folder create --help' for more details on each
+subcommand.
+"""
+        return t
+
+subDispatch = {
+    "create": create,
+    "invite": invite,
+    "join": join,
+}
+
+def do_magic_folder(options):
+    so = options.subOptions
+    so.stdout = options.stdout
+    so.stderr = options.stderr
+    f = subDispatch[options.subCommand]
+    return f(so)
+
+subCommands = [
+    ["magic-folder", None, MagicFolderCommand,
+     "Magic Folder subcommands: use 'tahoe magic-folder' for a list."],
+]
+
+dispatch = {
+    "magic-folder": do_magic_folder,
+}
diff --git a/src/allmydata/scripts/runner.py b/src/allmydata/scripts/runner.py
index c331eee7..a029b34a 100644
--- a/src/allmydata/scripts/runner.py
+++ b/src/allmydata/scripts/runner.py
@@ -5,7 +5,8 @@ from cStringIO import StringIO
 from twisted.python import usage
 
 from allmydata.scripts.common import get_default_nodedir
-from allmydata.scripts import debug, create_node, startstop_node, cli, keygen, stats_gatherer, admin
+from allmydata.scripts import debug, create_node, startstop_node, cli, keygen, stats_gatherer, admin, \
+magic_folder_cli
 from allmydata.util.encodingutil import quote_output, quote_local_unicode_path, get_io_encoding
 
 def GROUP(s):
@@ -45,6 +46,7 @@ class Options(usage.Options):
                     +   debug.subCommands
                     + GROUP("Using the filesystem")
                     +   cli.subCommands
+                    +   magic_folder_cli.subCommands
                     )
 
     optFlags = [
@@ -143,6 +145,8 @@ def runner(argv,
         rc = admin.dispatch[command](so)
     elif command in cli.dispatch:
         rc = cli.dispatch[command](so)
+    elif command in magic_folder_cli.dispatch:
+        rc = magic_folder_cli.dispatch[command](so)
     elif command in ac_dispatch:
         rc = ac_dispatch[command](so, stdout, stderr)
     else:
diff --git a/src/allmydata/scripts/tahoe_ls.py b/src/allmydata/scripts/tahoe_ls.py
index 78eea1f2..9a8ce240 100644
--- a/src/allmydata/scripts/tahoe_ls.py
+++ b/src/allmydata/scripts/tahoe_ls.py
@@ -151,9 +151,7 @@ def list(options):
             line.append(uri)
         if options["readonly-uri"]:
             line.append(quote_output(ro_uri or "-", quotemarks=False))
-
         rows.append((encoding_error, line))
-
     max_widths = []
     left_justifys = []
     for (encoding_error, row) in rows:
diff --git a/src/allmydata/test/no_network.py b/src/allmydata/test/no_network.py
index 8dd9a2f9..2dc59381 100644
--- a/src/allmydata/test/no_network.py
+++ b/src/allmydata/test/no_network.py
@@ -20,6 +20,9 @@ from twisted.internet import defer, reactor
 from twisted.python.failure import Failure
 from foolscap.api import Referenceable, fireEventually, RemoteException
 from base64 import b32encode
+
+from allmydata.util.assertutil import _assert
+
 from allmydata import uri as tahoe_uri
 from allmydata.client import Client
 from allmydata.storage.server import StorageServer, storage_index_to_dir
@@ -174,6 +177,9 @@ class NoNetworkStorageBroker:
         return None
 
 class NoNetworkClient(Client):
+
+    def disownServiceParent(self):
+        self.disownServiceParent()
     def create_tub(self):
         pass
     def init_introducer_client(self):
@@ -232,6 +238,7 @@ class NoNetworkGrid(service.MultiService):
         self.proxies_by_id = {} # maps to IServer on which .rref is a wrapped
                                 # StorageServer
         self.clients = []
+        self.client_config_hooks = client_config_hooks
 
         for i in range(num_servers):
             ss = self.make_server(i)
@@ -239,30 +246,42 @@ class NoNetworkGrid(service.MultiService):
         self.rebuild_serverlist()
 
         for i in range(num_clients):
-            clientid = hashutil.tagged_hash("clientid", str(i))[:20]
-            clientdir = os.path.join(basedir, "clients",
-                                     idlib.shortnodeid_b2a(clientid))
-            fileutil.make_dirs(clientdir)
-            f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
+            c = self.make_client(i)
+            self.clients.append(c)
+
+    def make_client(self, i, write_config=True):
+        clientid = hashutil.tagged_hash("clientid", str(i))[:20]
+        clientdir = os.path.join(self.basedir, "clients",
+                                 idlib.shortnodeid_b2a(clientid))
+        fileutil.make_dirs(clientdir)
+
+        tahoe_cfg_path = os.path.join(clientdir, "tahoe.cfg")
+        if write_config:
+            f = open(tahoe_cfg_path, "w")
             f.write("[node]\n")
             f.write("nickname = client-%d\n" % i)
             f.write("web.port = tcp:0:interface=127.0.0.1\n")
             f.write("[storage]\n")
             f.write("enabled = false\n")
             f.close()
-            c = None
-            if i in client_config_hooks:
-                # this hook can either modify tahoe.cfg, or return an
-                # entirely new Client instance
-                c = client_config_hooks[i](clientdir)
-            if not c:
-                c = NoNetworkClient(clientdir)
-                c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE)
-            c.nodeid = clientid
-            c.short_nodeid = b32encode(clientid).lower()[:8]
-            c._servers = self.all_servers # can be updated later
-            c.setServiceParent(self)
-            self.clients.append(c)
+        else:
+            _assert(os.path.exists(tahoe_cfg_path), tahoe_cfg_path=tahoe_cfg_path)
+
+        c = None
+        if i in self.client_config_hooks:
+            # this hook can either modify tahoe.cfg, or return an
+            # entirely new Client instance
+            c = self.client_config_hooks[i](clientdir)
+
+        if not c:
+            c = NoNetworkClient(clientdir)
+            c.set_default_mutable_keysize(TEST_RSA_KEY_SIZE)
+
+        c.nodeid = clientid
+        c.short_nodeid = b32encode(clientid).lower()[:8]
+        c._servers = self.all_servers # can be updated later
+        c.setServiceParent(self)
+        return c
 
     def make_server(self, i, readonly=False):
         serverid = hashutil.tagged_hash("serverid", str(i))[:20]
@@ -350,6 +369,9 @@ class GridTestMixin:
                                num_servers=num_servers,
                                client_config_hooks=client_config_hooks)
         self.g.setServiceParent(self.s)
+        self._record_webports_and_baseurls()
+
+    def _record_webports_and_baseurls(self):
         self.client_webports = [c.getServiceNamed("webish").getPortnum()
                                 for c in self.g.clients]
         self.client_baseurls = [c.getServiceNamed("webish").getURL()
@@ -358,6 +380,23 @@ class GridTestMixin:
     def get_clientdir(self, i=0):
         return self.g.clients[i].basedir
 
+    def set_clientdir(self, basedir, i=0):
+        self.g.clients[i].basedir = basedir
+
+    def get_client(self, i=0):
+        return self.g.clients[i]
+
+    def restart_client(self, i=0):
+        client = self.g.clients[i]
+        d = defer.succeed(None)
+        d.addCallback(lambda ign: self.g.removeService(client))
+        def _make_client(ign):
+            c = self.g.make_client(i, write_config=False)
+            self.g.clients[i] = c
+            self._record_webports_and_baseurls()
+        d.addCallback(_make_client)
+        return d
+
     def get_serverdir(self, i):
         return self.g.servers_by_number[i].storedir
 
diff --git a/src/allmydata/test/test_cli.py b/src/allmydata/test/test_cli.py
index b59a2b1b..9cfaf659 100644
--- a/src/allmydata/test/test_cli.py
+++ b/src/allmydata/test/test_cli.py
@@ -49,8 +49,11 @@ def parse_options(basedir, command, args):
 
 class CLITestMixin(ReallyEqualMixin):
     def do_cli(self, verb, *args, **kwargs):
+        # client_num is used to execute client CLI commands on a specific client.
+        client_num = kwargs.get("client_num", 0)
+
         nodeargs = [
-            "--node-directory", self.get_clientdir(),
+            "--node-directory", self.get_clientdir(i=client_num),
             ]
         argv = nodeargs + [verb] + list(args)
         stdin = kwargs.get("stdin", "")
diff --git a/src/allmydata/test/test_cli_backup.py b/src/allmydata/test/test_cli_backup.py
index 3bd2a614..a48295de 100644
--- a/src/allmydata/test/test_cli_backup.py
+++ b/src/allmydata/test/test_cli_backup.py
@@ -11,7 +11,8 @@ from allmydata.util import fileutil
 from allmydata.util.fileutil import abspath_expanduser_unicode
 from allmydata.util.encodingutil import get_io_encoding, unicode_to_argv
 from allmydata.util.namespace import Namespace
-from allmydata.scripts import cli, backupdb
+from allmydata.scripts import cli
+from allmydata import backupdb
 from .common_util import StallMixin
 from .no_network import GridTestMixin
 from .test_cli import CLITestMixin, parse_options
diff --git a/src/allmydata/test/test_cli_magic_folder.py b/src/allmydata/test/test_cli_magic_folder.py
new file mode 100644
index 00000000..1ba831ac
--- /dev/null
+++ b/src/allmydata/test/test_cli_magic_folder.py
@@ -0,0 +1,206 @@
+import os.path
+import re
+
+from twisted.trial import unittest
+from twisted.internet import defer
+from twisted.internet import reactor
+
+from allmydata.util import fileutil
+from allmydata.scripts.common import get_aliases
+from allmydata.test.no_network import GridTestMixin
+from .test_cli import CLITestMixin
+from allmydata.scripts import magic_folder_cli
+from allmydata.util.fileutil import abspath_expanduser_unicode
+from allmydata.frontends.magic_folder import MagicFolder
+from allmydata import uri
+
+
+class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin):
+
+    def do_create_magic_folder(self, client_num):
+        d = self.do_cli("magic-folder", "create", "magic:", client_num=client_num)
+        def _done((rc,stdout,stderr)):
+            self.failUnlessEqual(rc, 0)
+            self.failUnlessIn("Alias 'magic' created", stdout)
+            self.failUnlessEqual(stderr, "")
+            aliases = get_aliases(self.get_clientdir(i=client_num))
+            self.failUnlessIn("magic", aliases)
+            self.failUnless(aliases["magic"].startswith("URI:DIR2:"))
+        d.addCallback(_done)
+        return d
+
+    def do_invite(self, client_num, nickname):
+        d = self.do_cli("magic-folder", "invite", u"magic:", nickname, client_num=client_num)
+        def _done((rc,stdout,stderr)):
+            self.failUnless(rc == 0)
+            return (rc,stdout,stderr)
+        d.addCallback(_done)
+        return d
+
+    def do_join(self, client_num, local_dir, invite_code):
+        magic_readonly_cap, dmd_write_cap = invite_code.split(magic_folder_cli.INVITE_SEPARATOR)
+        d = self.do_cli("magic-folder", "join", invite_code, local_dir, client_num=client_num)
+        def _done((rc,stdout,stderr)):
+            self.failUnless(rc == 0)
+            return (rc,stdout,stderr)
+        d.addCallback(_done)
+        return d
+
+    def check_joined_config(self, client_num, upload_dircap):
+        """Tests that our collective directory has the readonly cap of
+        our upload directory.
+        """
+        collective_readonly_cap = fileutil.read(os.path.join(self.get_clientdir(i=client_num), "private/collective_dircap"))
+        d = self.do_cli("ls", "--json", collective_readonly_cap, client_num=client_num)
+        def _done((rc,stdout,stderr)):
+            self.failUnless(rc == 0)
+            return (rc,stdout,stderr)
+        d.addCallback(_done)
+        def test_joined_magic_folder((rc,stdout,stderr)):
+            readonly_cap = unicode(uri.from_string(upload_dircap).get_readonly().to_string(), 'utf-8')
+            s = re.search(readonly_cap, stdout)
+            self.failUnless(s is not None)
+            return None
+        d.addCallback(test_joined_magic_folder)
+        return d
+
+    def get_caps_from_files(self, client_num):
+        collective_dircap = fileutil.read(os.path.join(self.get_clientdir(i=client_num), "private/collective_dircap"))
+        upload_dircap = fileutil.read(os.path.join(self.get_clientdir(i=client_num), "private/magic_folder_dircap"))
+        self.failIf(collective_dircap is None or upload_dircap is None)
+        return collective_dircap, upload_dircap
+
+    def check_config(self, client_num, local_dir):
+        client_config = fileutil.read(os.path.join(self.get_clientdir(i=client_num), "tahoe.cfg"))
+        # XXX utf-8?
+        local_dir = local_dir.encode('utf-8')
+        ret = re.search("\[magic_folder\]\nenabled = True\nlocal.directory = %s" % (local_dir,), client_config)
+        self.failIf(ret is None)
+
+    def create_invite_join_magic_folder(self, nickname, local_dir):
+        d = self.do_cli("magic-folder", "create", u"magic:", nickname, local_dir)
+        def _done((rc,stdout,stderr)):
+            self.failUnless(rc == 0)
+            return (rc,stdout,stderr)
+        d.addCallback(_done)
+        def get_alice_caps(x):
+            client = self.get_client()
+            self.collective_dircap, self.upload_dircap = self.get_caps_from_files(0)
+            self.collective_dirnode = client.create_node_from_uri(self.collective_dircap)
+            self.upload_dirnode     = client.create_node_from_uri(self.upload_dircap)
+        d.addCallback(get_alice_caps)
+        d.addCallback(lambda x: self.check_joined_config(0, self.upload_dircap))
+        d.addCallback(lambda x: self.check_config(0, local_dir))
+        return d
+
+    def cleanup(self, res):
+        #print "cleanup", res
+        d = defer.succeed(None)
+        if self.magicfolder is not None:
+            d.addCallback(lambda ign: self.magicfolder.finish())
+        d.addCallback(lambda ign: res)
+        return d
+
+    def init_magicfolder(self, client_num, upload_dircap, collective_dircap, local_magic_dir, clock):
+        dbfile = abspath_expanduser_unicode(u"magicfolderdb.sqlite", base=self.get_clientdir(i=client_num))
+        magicfolder = MagicFolder(self.get_client(client_num), upload_dircap, collective_dircap, local_magic_dir,
+                                       dbfile, pending_delay=0.2, clock=clock)
+        magicfolder.setServiceParent(self.get_client(client_num))
+        magicfolder.ready()
+        return magicfolder
+
+    def setup_alice_and_bob(self, clock=reactor):
+        self.set_up_grid(num_clients=2)
+
+        alice_magic_dir = abspath_expanduser_unicode(u"Alice-magic", base=self.basedir)
+        self.mkdir_nonascii(alice_magic_dir)
+        bob_magic_dir = abspath_expanduser_unicode(u"Bob-magic", base=self.basedir)
+        self.mkdir_nonascii(bob_magic_dir)
+
+        # Alice creates a Magic Folder,
+        # invites herself then and joins.
+        d = self.do_create_magic_folder(0)
+        d.addCallback(lambda x: self.do_invite(0, u"Alice\u00F8"))
+        def get_invitecode(result):
+            self.invitecode = result[1].strip()
+        d.addCallback(get_invitecode)
+        d.addCallback(lambda x: self.do_join(0, alice_magic_dir, self.invitecode))
+        def get_alice_caps(x):
+            self.alice_collective_dircap, self.alice_upload_dircap = self.get_caps_from_files(0)
+        d.addCallback(get_alice_caps)
+        d.addCallback(lambda x: self.check_joined_config(0, self.alice_upload_dircap))
+        d.addCallback(lambda x: self.check_config(0, alice_magic_dir))
+        def get_Alice_magicfolder(result):
+            self.alice_magicfolder = self.init_magicfolder(0, self.alice_upload_dircap, self.alice_collective_dircap, alice_magic_dir, clock)
+            return result
+        d.addCallback(get_Alice_magicfolder)
+
+        # Alice invites Bob. Bob joins.
+        d.addCallback(lambda x: self.do_invite(0, u"Bob\u00F8"))
+        def get_invitecode(result):
+            self.invitecode = result[1].strip()
+        d.addCallback(get_invitecode)
+        d.addCallback(lambda x: self.do_join(1, bob_magic_dir, self.invitecode))
+        def get_bob_caps(x):
+            self.bob_collective_dircap, self.bob_upload_dircap = self.get_caps_from_files(1)
+        d.addCallback(get_bob_caps)
+        d.addCallback(lambda x: self.check_joined_config(1, self.bob_upload_dircap))
+        d.addCallback(lambda x: self.check_config(1, bob_magic_dir))
+        def get_Bob_magicfolder(result):
+            self.bob_magicfolder = self.init_magicfolder(1, self.bob_upload_dircap, self.bob_collective_dircap, bob_magic_dir, clock)
+            return result
+        d.addCallback(get_Bob_magicfolder)
+
+        def prepare_result(result):
+            # XXX improve this
+            return (self.alice_collective_dircap, self.alice_upload_dircap, self.alice_magicfolder,
+                    self.bob_collective_dircap,   self.bob_upload_dircap,   self.bob_magicfolder)
+        d.addCallback(prepare_result)
+        return d
+
+
+class CreateMagicFolder(MagicFolderCLITestMixin, unittest.TestCase):
+
+    def test_create_and_then_invite_join(self):
+        self.basedir = "cli/MagicFolder/create-and-then-invite-join"
+        self.set_up_grid()
+        self.local_dir = os.path.join(self.basedir, "magic")
+        d = self.do_create_magic_folder(0)
+        d.addCallback(lambda x: self.do_invite(0, u"Alice"))
+        def get_invite((rc,stdout,stderr)):
+            self.invite_code = stdout.strip()
+        d.addCallback(get_invite)
+        d.addCallback(lambda x: self.do_join(0, self.local_dir, self.invite_code))
+        def get_caps(x):
+            self.collective_dircap, self.upload_dircap = self.get_caps_from_files(0)
+        d.addCallback(get_caps)
+        d.addCallback(lambda x: self.check_joined_config(0, self.upload_dircap))
+        d.addCallback(lambda x: self.check_config(0, self.local_dir))
+        return d
+
+    def test_create_error(self):
+        self.basedir = "cli/MagicFolder/create-error"
+        self.set_up_grid()
+        self.local_dir = os.path.join(self.basedir, "magic")
+        d = self.do_cli("magic-folder", "create", "m a g i c:", client_num=0)
+        def _done((rc,stdout,stderr)):
+            self.failIfEqual(rc, 0)
+            self.failUnlessIn("Alias names cannot contain spaces.", stderr)
+        d.addCallback(_done)
+        return d
+
+    def test_create_invite_join(self):
+        self.basedir = "cli/MagicFolder/create-invite-join"
+        self.set_up_grid()
+        self.local_dir = os.path.join(self.basedir, "magic")
+        d = self.do_cli("magic-folder", "create", u"magic:", u"Alice", self.local_dir)
+        def _done((rc,stdout,stderr)):
+            self.failUnless(rc == 0)
+            return (rc,stdout,stderr)
+        d.addCallback(_done)
+        def get_caps(x):
+            self.collective_dircap, self.upload_dircap = self.get_caps_from_files(0)
+        d.addCallback(get_caps)
+        d.addCallback(lambda x: self.check_joined_config(0, self.upload_dircap))
+        d.addCallback(lambda x: self.check_config(0, self.local_dir))
+        return d
diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py
index e2a1e9a3..cf17713f 100644
--- a/src/allmydata/test/test_client.py
+++ b/src/allmydata/test/test_client.py
@@ -306,15 +306,19 @@ class Basic(testutil.ReallyEqualMixin, testutil.NonASCIIPathMixin, unittest.Test
         class MockMagicFolder(service.MultiService):
             name = 'magic-folder'
 
-            def __init__(self, client, upload_dircap, local_dir, dbfile, inotify=None,
+            def __init__(self, client, upload_dircap, collective_dircap, local_dir, dbfile, inotify=None,
                          pending_delay=1.0):
                 service.MultiService.__init__(self)
                 self.client = client
                 self.upload_dircap = upload_dircap
+                self.collective_dircap = collective_dircap
                 self.local_dir = local_dir
                 self.dbfile = dbfile
                 self.inotify = inotify
 
+            def ready(self):
+                pass
+
         self.patch(allmydata.frontends.magic_folder, 'MagicFolder', MockMagicFolder)
 
         upload_dircap = "URI:DIR2:blah"
@@ -328,12 +332,14 @@ class Basic(testutil.ReallyEqualMixin, testutil.NonASCIIPathMixin, unittest.Test
 
         basedir1 = "test_client.Basic.test_create_magic_folder_service1"
         os.mkdir(basedir1)
+
         fileutil.write(os.path.join(basedir1, "tahoe.cfg"),
                        config + "local.directory = " + local_dir_utf8 + "\n")
         self.failUnlessRaises(MissingConfigEntry, client.Client, basedir1)
 
         fileutil.write(os.path.join(basedir1, "tahoe.cfg"), config)
         fileutil.write(os.path.join(basedir1, "private", "magic_folder_dircap"), "URI:DIR2:blah")
+        fileutil.write(os.path.join(basedir1, "private", "collective_dircap"), "URI:DIR2:meow")
         self.failUnlessRaises(MissingConfigEntry, client.Client, basedir1)
 
         fileutil.write(os.path.join(basedir1, "tahoe.cfg"),
@@ -353,15 +359,11 @@ class Basic(testutil.ReallyEqualMixin, testutil.NonASCIIPathMixin, unittest.Test
 
         class Boom(Exception):
             pass
-        def BoomMagicFolder(client, upload_dircap, local_dir_utf8, inotify=None):
+        def BoomMagicFolder(client, upload_dircap, collective_dircap, local_dir, dbfile,
+                            inotify=None, pending_delay=1.0):
             raise Boom()
         self.patch(allmydata.frontends.magic_folder, 'MagicFolder', BoomMagicFolder)
 
-        logged_messages = []
-        def mock_log(*args, **kwargs):
-            logged_messages.append("%r %r" % (args, kwargs))
-        self.patch(allmydata.util.log, 'msg', mock_log)
-
         basedir2 = "test_client.Basic.test_create_magic_folder_service2"
         os.mkdir(basedir2)
         os.mkdir(os.path.join(basedir2, "private"))
@@ -371,10 +373,8 @@ class Basic(testutil.ReallyEqualMixin, testutil.NonASCIIPathMixin, unittest.Test
                        "enabled = true\n" +
                        "local.directory = " + local_dir_utf8 + "\n")
         fileutil.write(os.path.join(basedir2, "private", "magic_folder_dircap"), "URI:DIR2:blah")
-        c2 = client.Client(basedir2)
-        self.failUnlessRaises(KeyError, c2.getServiceNamed, 'magic-folder')
-        self.failUnless([True for arg in logged_messages if "Boom" in arg],
-                        logged_messages)
+        fileutil.write(os.path.join(basedir2, "private", "collective_dircap"), "URI:DIR2:meow")
+        self.failUnlessRaises(Boom, client.Client, basedir2)
 
 
 def flush_but_dont_ignore(res):
diff --git a/src/allmydata/test/test_magic_folder.py b/src/allmydata/test/test_magic_folder.py
index 891f226a..e0396334 100644
--- a/src/allmydata/test/test_magic_folder.py
+++ b/src/allmydata/test/test_magic_folder.py
@@ -2,10 +2,9 @@
 import os, sys
 
 from twisted.trial import unittest
-from twisted.python import runtime
 from twisted.internet import defer
 
-from allmydata.interfaces import IDirectoryNode, NoSuchChildError
+from allmydata.interfaces import IDirectoryNode
 
 from allmydata.util import fake_inotify, fileutil
 from allmydata.util.encodingutil import get_filesystem_encoding, to_filepath
@@ -13,12 +12,15 @@ from allmydata.util.consumer import download_to_data
 from allmydata.test.no_network import GridTestMixin
 from allmydata.test.common_util import ReallyEqualMixin, NonASCIIPathMixin
 from allmydata.test.common import ShouldFailMixin
+from .test_cli_magic_folder import MagicFolderCLITestMixin
 
-from allmydata.frontends.magic_folder import MagicFolder
+from allmydata.frontends import magic_folder
+from allmydata.frontends.magic_folder import MagicFolder, Downloader
+from allmydata import backupdb, magicpath
 from allmydata.util.fileutil import abspath_expanduser_unicode
 
 
-class MagicFolderTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin, NonASCIIPathMixin):
+class MagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, ReallyEqualMixin, NonASCIIPathMixin):
     """
     These tests will be run both with a mock notifier, and (on platforms that support it)
     with the real INotify.
@@ -28,66 +30,240 @@ class MagicFolderTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin, Non
         GridTestMixin.setUp(self)
         temp = self.mktemp()
         self.basedir = abspath_expanduser_unicode(temp.decode(get_filesystem_encoding()))
-    def _get_count(self, name):
-        return self.stats_provider.get_stats()["counters"].get(name, 0)
+        self.magicfolder = None
+
+    def _get_count(self, name, client=None):
+        counters = (client or self.get_client()).stats_provider.get_stats()["counters"]
+        return counters.get('magic_folder.%s' % (name,), 0)
+
+    def _createdb(self):
+        dbfile = abspath_expanduser_unicode(u"magicfolderdb.sqlite", base=self.basedir)
+        bdb = backupdb.get_backupdb(dbfile, create_version=(backupdb.SCHEMA_v3, 3))
+        self.failUnless(bdb, "unable to create backupdb from %r" % (dbfile,))
+        self.failUnlessEqual(bdb.VERSION, 3)
+        return bdb
+
+    def _restart_client(self, ign):
+        #print "_restart_client"
+        d = self.restart_client()
+        d.addCallback(self._wait_until_started)
+        return d
 
-    def _test(self):
-        self.uploader = None
+    def _wait_until_started(self, ign):
+        #print "_wait_until_started"
+        self.magicfolder = self.get_client().getServiceNamed('magic-folder')
+        return self.magicfolder.ready()
+
+    def test_db_basic(self):
+        fileutil.make_dirs(self.basedir)
+        self._createdb()
+
+    def test_db_persistence(self):
+        """Test that a file upload creates an entry in the database."""
+
+        fileutil.make_dirs(self.basedir)
+        db = self._createdb()
+
+        relpath1 = u"myFile1"
+        pathinfo = fileutil.PathInfo(isdir=False, isfile=True, islink=False,
+                                     exists=True, size=1, mtime=123, ctime=456)
+        db.did_upload_version('URI:LIT:1', relpath1, 0, pathinfo)
+
+        c = db.cursor
+        c.execute("SELECT size, mtime, ctime"
+                  " FROM local_files"
+                  " WHERE path=?",
+                  (relpath1,))
+        row = c.fetchone()
+        self.failUnlessEqual(row, (pathinfo.size, pathinfo.mtime, pathinfo.ctime))
+
+        # Second test uses db.is_new_file instead of SQL query directly
+        # to confirm the previous upload entry in the db.
+        relpath2 = u"myFile2"
+        path2 = os.path.join(self.basedir, relpath2)
+        fileutil.write(path2, "meow\n")
+        pathinfo = fileutil.get_pathinfo(path2)
+        db.did_upload_version('URI:LIT:2', relpath2, 0, pathinfo)
+        self.failUnlessFalse(db.is_new_file(pathinfo, relpath2))
+
+        different_pathinfo = fileutil.PathInfo(isdir=False, isfile=True, islink=False,
+                                               exists=True, size=0, mtime=pathinfo.mtime, ctime=pathinfo.ctime)
+        self.failUnlessTrue(db.is_new_file(different_pathinfo, relpath2))
+
+    def test_magicfolder_start_service(self):
         self.set_up_grid()
-        self.local_dir = os.path.join(self.basedir, self.unicode_or_fallback(u"loc\u0101l_dir", u"local_dir"))
+
+        self.local_dir = abspath_expanduser_unicode(self.unicode_or_fallback(u"l\u00F8cal_dir", u"local_dir"),
+                                                    base=self.basedir)
         self.mkdir_nonascii(self.local_dir)
 
-        self.client = self.g.clients[0]
-        self.stats_provider = self.client.stats_provider
+        d = defer.succeed(None)
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.dirs_monitored'), 0))
 
-        d = self.client.create_dirnode()
-        def _made_upload_dir(n):
-            self.failUnless(IDirectoryNode.providedBy(n))
-            self.upload_dirnode = n
-            self.upload_dircap = n.get_uri()
-            self.uploader = DropUploader(self.client, self.upload_dircap, self.local_dir.encode('utf-8'),
-                                         inotify=self.inotify)
-            return self.uploader.startService()
-        d.addCallback(_made_upload_dir)
+        d.addCallback(lambda ign: self.create_invite_join_magic_folder(u"Alice", self.local_dir))
+        d.addCallback(self._restart_client)
+
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.dirs_monitored'), 1))
+        d.addBoth(self.cleanup)
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.dirs_monitored'), 0))
+        return d
+
+    def test_move_tree(self):
+        self.set_up_grid()
+
+        self.local_dir = abspath_expanduser_unicode(self.unicode_or_fallback(u"l\u00F8cal_dir", u"local_dir"),
+                                                    base=self.basedir)
+        self.mkdir_nonascii(self.local_dir)
+
+        empty_tree_name = self.unicode_or_fallback(u"empty_tr\u00EAe", u"empty_tree")
+        empty_tree_dir = abspath_expanduser_unicode(empty_tree_name, base=self.basedir)
+        new_empty_tree_dir = abspath_expanduser_unicode(empty_tree_name, base=self.local_dir)
+
+        small_tree_name = self.unicode_or_fallback(u"small_tr\u00EAe", u"empty_tree")
+        small_tree_dir = abspath_expanduser_unicode(small_tree_name, base=self.basedir)
+        new_small_tree_dir = abspath_expanduser_unicode(small_tree_name, base=self.local_dir)
+
+        d = self.create_invite_join_magic_folder(u"Alice", self.local_dir)
+        d.addCallback(self._restart_client)
+
+        def _check_move_empty_tree(res):
+            #print "_check_move_empty_tree"
+            self.mkdir_nonascii(empty_tree_dir)
+            d2 = self.magicfolder.uploader.set_hook('processed')
+            os.rename(empty_tree_dir, new_empty_tree_dir)
+            self.notify(to_filepath(new_empty_tree_dir), self.inotify.IN_MOVED_TO)
+            return d2
+        d.addCallback(_check_move_empty_tree)
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 1))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.files_uploaded'), 0))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created'), 1))
+
+        def _check_move_small_tree(res):
+            #print "_check_move_small_tree"
+            self.mkdir_nonascii(small_tree_dir)
+            fileutil.write(abspath_expanduser_unicode(u"what", base=small_tree_dir), "say when")
+            d2 = self.magicfolder.uploader.set_hook('processed', ignore_count=1)
+            os.rename(small_tree_dir, new_small_tree_dir)
+            self.notify(to_filepath(new_small_tree_dir), self.inotify.IN_MOVED_TO)
+            return d2
+        d.addCallback(_check_move_small_tree)
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 3))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.files_uploaded'), 1))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created'), 2))
+
+        def _check_moved_tree_is_watched(res):
+            #print "_check_moved_tree_is_watched"
+            d2 = self.magicfolder.uploader.set_hook('processed')
+            fileutil.write(abspath_expanduser_unicode(u"another", base=new_small_tree_dir), "file")
+            self.notify(to_filepath(abspath_expanduser_unicode(u"another", base=new_small_tree_dir)), self.inotify.IN_CLOSE_WRITE)
+            return d2
+        d.addCallback(_check_moved_tree_is_watched)
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 4))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.files_uploaded'), 2))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created'), 2))
+
+        # Files that are moved out of the upload directory should no longer be watched.
+        #def _move_dir_away(ign):
+        #    os.rename(new_empty_tree_dir, empty_tree_dir)
+        #    # Wuh? Why don't we get this event for the real test?
+        #    #self.notify(to_filepath(new_empty_tree_dir), self.inotify.IN_MOVED_FROM)
+        #d.addCallback(_move_dir_away)
+        #def create_file(val):
+        #    test_file = abspath_expanduser_unicode(u"what", base=empty_tree_dir)
+        #    fileutil.write(test_file, "meow")
+        #    #self.notify(...)
+        #    return
+        #d.addCallback(create_file)
+        #d.addCallback(lambda ign: time.sleep(1))  # XXX ICK
+        #d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
+        #d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 4))
+        #d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.files_uploaded'), 2))
+        #d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
+        #d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created'), 2))
+
+        d.addBoth(self.cleanup)
+        return d
+
+    def test_persistence(self):
+        """
+        Perform an upload of a given file and then stop the client.
+        Start a new client and magic-folder service... and verify that the file is NOT uploaded
+        a second time. This test is meant to test the database persistence along with
+        the startup and shutdown code paths of the magic-folder service.
+        """
+        self.set_up_grid()
+        self.local_dir = abspath_expanduser_unicode(u"test_persistence", base=self.basedir)
+        self.mkdir_nonascii(self.local_dir)
+        self.collective_dircap = ""
+
+        d = defer.succeed(None)
+        d.addCallback(lambda ign: self.create_invite_join_magic_folder(u"Alice", self.local_dir))
+        d.addCallback(self._restart_client)
+
+        def create_test_file(filename):
+            d2 = self.magicfolder.uploader.set_hook('processed')
+            test_file = abspath_expanduser_unicode(filename, base=self.local_dir)
+            fileutil.write(test_file, "meow %s" % filename)
+            self.notify(to_filepath(test_file), self.inotify.IN_CLOSE_WRITE)
+            return d2
+        d.addCallback(lambda ign: create_test_file(u"what1"))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 1))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
+        d.addCallback(self.cleanup)
+
+        d.addCallback(self._restart_client)
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 1))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
+        d.addCallback(lambda ign: create_test_file(u"what2"))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 2))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
+        d.addBoth(self.cleanup)
+        return d
+
+    def test_magic_folder(self):
+        self.set_up_grid()
+        self.local_dir = os.path.join(self.basedir, self.unicode_or_fallback(u"loc\u0101l_dir", u"local_dir"))
+        self.mkdir_nonascii(self.local_dir)
+
+        d = self.create_invite_join_magic_folder(u"Alice\u0101", self.local_dir)
+        d.addCallback(self._restart_client)
 
         # Write something short enough for a LIT file.
-        d.addCallback(lambda ign: self._test_file(u"short", "test"))
+        d.addCallback(lambda ign: self._check_file(u"short", "test"))
 
         # Write to the same file again with different data.
-        d.addCallback(lambda ign: self._test_file(u"short", "different"))
+        d.addCallback(lambda ign: self._check_file(u"short", "different"))
 
         # Test that temporary files are not uploaded.
-        d.addCallback(lambda ign: self._test_file(u"tempfile", "test", temporary=True))
+        d.addCallback(lambda ign: self._check_file(u"tempfile", "test", temporary=True))
 
         # Test that we tolerate creation of a subdirectory.
         d.addCallback(lambda ign: os.mkdir(os.path.join(self.local_dir, u"directory")))
 
         # Write something longer, and also try to test a Unicode name if the fs can represent it.
         name_u = self.unicode_or_fallback(u"l\u00F8ng", u"long")
-        d.addCallback(lambda ign: self._test_file(name_u, "test"*100))
+        d.addCallback(lambda ign: self._check_file(name_u, "test"*100))
 
         # TODO: test that causes an upload failure.
-        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_failed'), 0))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
 
-        # Prevent unclean reactor errors.
-        def _cleanup(res):
-            d = defer.succeed(None)
-            if self.uploader is not None:
-                d.addCallback(lambda ign: self.uploader.finish(for_tests=True))
-            d.addCallback(lambda ign: res)
-            return d
-        d.addBoth(_cleanup)
+        d.addBoth(self.cleanup)
         return d
 
-    def _test_file(self, name_u, data, temporary=False):
-        previously_uploaded = self._get_count('drop_upload.files_uploaded')
-        previously_disappeared = self._get_count('drop_upload.files_disappeared')
-
-        d = defer.Deferred()
+    def _check_file(self, name_u, data, temporary=False):
+        previously_uploaded = self._get_count('uploader.objects_succeeded')
+        previously_disappeared = self._get_count('uploader.objects_disappeared')
 
-        # Note: this relies on the fact that we only get one IN_CLOSE_WRITE notification per file
-        # (otherwise we would get a defer.AlreadyCalledError). Should we be relying on that?
-        self.uploader.set_uploaded_callback(d.callback)
+        d = self.magicfolder.uploader.set_hook('processed')
 
         path_u = abspath_expanduser_unicode(name_u, base=self.local_dir)
         path = to_filepath(path_u)
@@ -103,28 +279,150 @@ class MagicFolderTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin, Non
             f.close()
         if temporary and sys.platform == "win32":
             os.unlink(path_u)
+            self.notify(path, self.inotify.IN_DELETE)
         fileutil.flush_volume(path_u)
-        self.notify_close_write(path)
+        self.notify(path, self.inotify.IN_CLOSE_WRITE)
 
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
         if temporary:
-            d.addCallback(lambda ign: self.shouldFail(NoSuchChildError, 'temp file not uploaded', None,
-                                                      self.upload_dirnode.get, name_u))
-            d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_disappeared'),
+            d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_disappeared'),
                                                                  previously_disappeared + 1))
         else:
             d.addCallback(lambda ign: self.upload_dirnode.get(name_u))
             d.addCallback(download_to_data)
             d.addCallback(lambda actual_data: self.failUnlessReallyEqual(actual_data, data))
-            d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_uploaded'),
+            d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'),
                                                                  previously_uploaded + 1))
 
-        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_queued'), 0))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
+        return d
+
+    def _check_version_in_dmd(self, magicfolder, relpath_u, expected_version):
+        encoded_name_u = magicpath.path2magic(relpath_u)
+        d = magicfolder.downloader._get_collective_latest_file(encoded_name_u)
+        def check_latest(result):
+            if result[0] is not None:
+                node, metadata = result
+                d.addCallback(lambda ign: self.failUnlessEqual(metadata['version'], expected_version))
+        d.addCallback(check_latest)
         return d
 
+    def _check_version_in_local_db(self, magicfolder, relpath_u, expected_version):
+        version = magicfolder._db.get_local_file_version(relpath_u)
+        #print "_check_version_in_local_db: %r has version %s" % (relpath_u, version)
+        self.failUnlessEqual(version, expected_version)
+
+    def test_alice_bob(self):
+        d = self.setup_alice_and_bob()
+        def get_results(result):
+            # XXX are these used?
+            (self.alice_collective_dircap, self.alice_upload_dircap, self.alice_magicfolder,
+             self.bob_collective_dircap,   self.bob_upload_dircap,   self.bob_magicfolder) = result
+            #print "Alice magicfolderdb is at %r" % (self.alice_magicfolder._client.basedir)
+            #print "Bob   magicfolderdb is at %r" % (self.bob_magicfolder._client.basedir)
+        d.addCallback(get_results)
+
+        def Alice_write_a_file(result):
+            print "Alice writes a file\n"
+            self.file_path = abspath_expanduser_unicode(u"file1", base=self.alice_magicfolder.uploader._local_path_u)
+            fileutil.write(self.file_path, "meow, meow meow. meow? meow meow! meow.")
+            self.magicfolder = self.alice_magicfolder
+            self.notify(to_filepath(self.file_path), self.inotify.IN_CLOSE_WRITE)
+
+        d.addCallback(Alice_write_a_file)
+
+        def Alice_wait_for_upload(result):
+            print "Alice waits for an upload\n"
+            d2 = self.alice_magicfolder.uploader.set_hook('processed')
+            return d2
+        d.addCallback(Alice_wait_for_upload)
+        d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file1", 0))
+        d.addCallback(lambda ign: self._check_version_in_local_db(self.alice_magicfolder, u"file1", 0))
+
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded', client=self.alice_magicfolder._client), 1))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.files_uploaded', client=self.alice_magicfolder._client), 1))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued', client=self.alice_magicfolder._client), 0))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created', client=self.alice_magicfolder._client), 0))
+
+        def Bob_wait_for_download(result):
+            print "Bob waits for a download\n"
+            d2 = self.bob_magicfolder.downloader.set_hook('processed')
+            return d2
+        d.addCallback(Bob_wait_for_download)
+        d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 0))
+        d.addCallback(lambda ign: self._check_version_in_dmd(self.bob_magicfolder, u"file1", 0)) # XXX prolly not needed
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('downloader.objects_failed'), 0))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('downloader.objects_downloaded', client=self.bob_magicfolder._client), 1))
+
+
+        # test deletion of file behavior
+        def Alice_delete_file(result):
+            print "Alice deletes the file!\n"
+            os.unlink(self.file_path)
+            self.notify(to_filepath(self.file_path), self.inotify.IN_DELETE)
+
+            return None
+        d.addCallback(Alice_delete_file)
+        d.addCallback(Alice_wait_for_upload)
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded', client=self.alice_magicfolder._client), 2))
+        d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file1", 1))
+        d.addCallback(lambda ign: self._check_version_in_local_db(self.alice_magicfolder, u"file1", 1))
+
+        d.addCallback(Bob_wait_for_download)
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('downloader.objects_failed'), 0))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('downloader.objects_downloaded', client=self.bob_magicfolder._client), 2))
+        d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 1))
+        d.addCallback(lambda ign: self._check_version_in_dmd(self.bob_magicfolder, u"file1", 1))
+
+        def Alice_rewrite_file(result):
+            print "Alice rewrites file\n"
+            self.file_path = abspath_expanduser_unicode(u"file1", base=self.alice_magicfolder.uploader._local_path_u)
+            fileutil.write(self.file_path, "Alice suddenly sees the white rabbit running into the forest.")
+            self.magicfolder = self.alice_magicfolder
+            self.notify(to_filepath(self.file_path), self.inotify.IN_CLOSE_WRITE)
+
+        d.addCallback(Alice_rewrite_file)
+        d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 1))
+        d.addCallback(Alice_wait_for_upload)
+
+        d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file1", 2))
+        d.addCallback(lambda ign: self._check_version_in_local_db(self.alice_magicfolder, u"file1", 2))
+
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded', client=self.alice_magicfolder._client), 3))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.files_uploaded', client=self.alice_magicfolder._client), 3))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued', client=self.alice_magicfolder._client), 0))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created', client=self.alice_magicfolder._client), 0))
+
+        d.addCallback(Bob_wait_for_download)
+        d.addCallback(lambda ign: self._check_version_in_dmd(self.bob_magicfolder, u"file1", 2))
+        d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 2))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('downloader.objects_failed'), 0))
+        d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('downloader.objects_downloaded', client=self.bob_magicfolder._client), 3))
+
+        def cleanup_Alice_and_Bob(result):
+            print "cleanup alice bob test\n"
+            d = defer.succeed(None)
+            d.addCallback(lambda ign: self.alice_magicfolder.finish())
+            d.addCallback(lambda ign: self.bob_magicfolder.finish())
+            d.addCallback(lambda ign: result)
+            return d
+        d.addCallback(cleanup_Alice_and_Bob)
+        return d
 
 class MockTest(MagicFolderTestMixin, unittest.TestCase):
     """This can run on any platform, and even if twisted.internet.inotify can't be imported."""
 
+    def setUp(self):
+        MagicFolderTestMixin.setUp(self)
+        self.inotify = fake_inotify
+        self.patch(magic_folder, 'get_inotify_module', lambda: self.inotify)
+
+    def notify(self, path, mask):
+        self.magicfolder.uploader._notifier.event(path, mask)
+
     def test_errors(self):
         self.set_up_grid()
 
@@ -137,44 +435,78 @@ class MockTest(MagicFolderTestMixin, unittest.TestCase):
 
         client = self.g.clients[0]
         d = client.create_dirnode()
-        def _made_upload_dir(n):
+        def _check_errors(n):
             self.failUnless(IDirectoryNode.providedBy(n))
             upload_dircap = n.get_uri()
             readonly_dircap = n.get_readonly_uri()
 
             self.shouldFail(AssertionError, 'nonexistent local.directory', 'there is no directory',
-                            MagicFolder, client, upload_dircap, doesnotexist, inotify=fake_inotify)
+                            MagicFolder, client, upload_dircap, '', doesnotexist, magicfolderdb)
             self.shouldFail(AssertionError, 'non-directory local.directory', 'is not a directory',
-                            MagicFolder, client, upload_dircap, not_a_dir, inotify=fake_inotify)
+                            MagicFolder, client, upload_dircap, '', not_a_dir, magicfolderdb)
             self.shouldFail(AssertionError, 'bad upload.dircap', 'does not refer to a directory',
-                            MagicFolder, client, 'bad', errors_dir, inotify=fake_inotify)
+                            MagicFolder, client, 'bad', '', errors_dir, magicfolderdb)
             self.shouldFail(AssertionError, 'non-directory upload.dircap', 'does not refer to a directory',
-                            MagicFolder, client, 'URI:LIT:foo', errors_dir, inotify=fake_inotify)
+                            MagicFolder, client, 'URI:LIT:foo', '', errors_dir, magicfolderdb)
             self.shouldFail(AssertionError, 'readonly upload.dircap', 'is not a writecap to a directory',
-                            MagicFolder, client, readonly_dircap, errors_dir, inotify=fake_inotify)
-        d.addCallback(_made_upload_dir)
+                            MagicFolder, client, readonly_dircap, '', errors_dir, magicfolderdb,)
+            self.shouldFail(AssertionError, 'collective dircap',
+                            "The URI in 'private/collective_dircap' is not a readonly cap to a directory.",
+                            MagicFolder, client, upload_dircap, upload_dircap, errors_dir, magicfolderdb)
+
+            def _not_implemented():
+                raise NotImplementedError("blah")
+            self.patch(magic_folder, 'get_inotify_module', _not_implemented)
+            self.shouldFail(NotImplementedError, 'unsupported', 'blah',
+                            MagicFolder, client, upload_dircap, '', errors_dir, magicfolderdb)
+        d.addCallback(_check_errors)
         return d
 
-    def test_drop_upload(self):
-        self.inotify = fake_inotify
-        self.basedir = "drop_upload.MockTest.test_drop_upload"
-        return self._test()
+    def test_write_downloaded_file(self):
+        workdir = u"cli/MagicFolder/write-downloaded-file"
+        local_file = fileutil.abspath_expanduser_unicode(os.path.join(workdir, "foobar"))
+
+        # create a file with name "foobar" with content "foo"
+        # write downloaded file content "bar" into "foobar" with is_conflict = False
+        fileutil.make_dirs(workdir)
+        fileutil.write(local_file, "foo")
+
+        # if is_conflict is False, then the .conflict file shouldn't exist.
+        Downloader._write_downloaded_file(local_file, "bar", False, None)
+        conflicted_path = local_file + u".conflict"
+        self.failIf(os.path.exists(conflicted_path))
 
-    def notify_close_write(self, path):
-        self.uploader._notifier.event(path, self.inotify.IN_CLOSE_WRITE)
+        # At this point, the backup file should exist with content "foo"
+        backup_path = local_file + u".backup"
+        self.failUnless(os.path.exists(backup_path))
+        self.failUnlessEqual(fileutil.read(backup_path), "foo")
+
+        # .tmp file shouldn't exist
+        self.failIf(os.path.exists(local_file + u".tmp"))
+
+        # .. and the original file should have the new content
+        self.failUnlessEqual(fileutil.read(local_file), "bar")
+
+        # now a test for conflicted case
+        Downloader._write_downloaded_file(local_file, "bar", True, None)
+        self.failUnless(os.path.exists(conflicted_path))
+
+        # .tmp file shouldn't exist
+        self.failIf(os.path.exists(local_file + u".tmp"))
 
 
 class RealTest(MagicFolderTestMixin, unittest.TestCase):
     """This is skipped unless both Twisted and the platform support inotify."""
 
-    def test_drop_upload(self):
-        self.inotify = None  # use the appropriate inotify for the platform
-        self.basedir = "drop_upload.RealTest.test_drop_upload"
-        return self._test()
+    def setUp(self):
+        MagicFolderTestMixin.setUp(self)
+        self.inotify = magic_folder.get_inotify_module()
 
-    def notify_close_write(self, path):
-        # Writing to the file causes the notification.
+    def notify(self, path, mask):
+        # Writing to the filesystem causes the notification.
         pass
 
-if sys.platform != "win32" and not runtime.platform.supportsINotify():
+try:
+    magic_folder.get_inotify_module()
+except NotImplementedError:
     RealTest.skip = "Magic Folder support can only be tested for-real on an OS that supports inotify or equivalent."
diff --git a/src/allmydata/test/test_magicpath.py b/src/allmydata/test/test_magicpath.py
new file mode 100644
index 00000000..1227a2c4
--- /dev/null
+++ b/src/allmydata/test/test_magicpath.py
@@ -0,0 +1,28 @@
+
+from twisted.trial import unittest
+
+from allmydata import magicpath
+
+
+class MagicPath(unittest.TestCase):
+    tests = {
+        u"Documents/work/critical-project/qed.txt": u"Documents@_work@_critical-project@_qed.txt",
+        u"Documents/emails/bunnyfufu@hoppingforest.net": u"Documents@_emails@_bunnyfufu@@hoppingforest.net",
+        u"foo/@/bar": u"foo@_@@@_bar",
+    }
+
+    def test_path2magic(self):
+        for test, expected in self.tests.items():
+            self.failUnlessEqual(magicpath.path2magic(test), expected)
+
+    def test_magic2path(self):
+        for expected, test in self.tests.items():
+            self.failUnlessEqual(magicpath.magic2path(test), expected)
+
+    def test_should_ignore(self):
+        self.failUnlessEqual(magicpath.should_ignore_file(u".bashrc"), True)
+        self.failUnlessEqual(magicpath.should_ignore_file(u"bashrc."), False)
+        self.failUnlessEqual(magicpath.should_ignore_file(u"forest/tree/branch/.bashrc"), True)
+        self.failUnlessEqual(magicpath.should_ignore_file(u"forest/tree/.branch/bashrc"), True)
+        self.failUnlessEqual(magicpath.should_ignore_file(u"forest/.tree/branch/bashrc"), True)
+        self.failUnlessEqual(magicpath.should_ignore_file(u"forest/tree/branch/bashrc"), False)
diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py
index db64bf19..4b7354e9 100644
--- a/src/allmydata/test/test_util.py
+++ b/src/allmydata/test/test_util.py
@@ -441,6 +441,74 @@ class FileUtil(ReallyEqualMixin, unittest.TestCase):
         self.failIf(os.path.exists(fn))
         self.failUnless(os.path.exists(fn2))
 
+    def test_rename_no_overwrite(self):
+        workdir = fileutil.abspath_expanduser_unicode(u"test_rename_no_overwrite")
+        fileutil.make_dirs(workdir)
+
+        source_path = os.path.join(workdir, "source")
+        dest_path   = os.path.join(workdir, "dest")
+
+        # when neither file exists
+        self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
+
+        # when only dest exists
+        fileutil.write(dest_path,   "dest")
+        self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
+        self.failUnlessEqual(fileutil.read(dest_path),   "dest")
+
+        # when both exist
+        fileutil.write(source_path, "source")
+        self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
+        self.failUnlessEqual(fileutil.read(source_path), "source")
+        self.failUnlessEqual(fileutil.read(dest_path),   "dest")
+
+        # when only source exists
+        os.remove(dest_path)
+        fileutil.rename_no_overwrite(source_path, dest_path)
+        self.failUnlessEqual(fileutil.read(dest_path), "source")
+        self.failIf(os.path.exists(source_path))
+
+    def test_replace_file(self):
+        workdir = fileutil.abspath_expanduser_unicode(u"test_replace_file")
+        fileutil.make_dirs(workdir)
+
+        backup_path      = os.path.join(workdir, "backup")
+        replaced_path    = os.path.join(workdir, "replaced")
+        replacement_path = os.path.join(workdir, "replacement")
+
+        # when none of the files exist
+        self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path, backup_path)
+
+        # when only replaced exists
+        fileutil.write(replaced_path,    "foo")
+        self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path, backup_path)
+        self.failUnlessEqual(fileutil.read(replaced_path), "foo")
+
+        # when both replaced and replacement exist, but not backup
+        fileutil.write(replacement_path, "bar")
+        fileutil.replace_file(replaced_path, replacement_path, backup_path)
+        self.failUnlessEqual(fileutil.read(backup_path),   "foo")
+        self.failUnlessEqual(fileutil.read(replaced_path), "bar")
+        self.failIf(os.path.exists(replacement_path))
+
+        # when only replacement exists
+        os.remove(backup_path)
+        os.remove(replaced_path)
+        fileutil.write(replacement_path, "bar")
+        fileutil.replace_file(replaced_path, replacement_path, backup_path)
+        self.failUnlessEqual(fileutil.read(replaced_path), "bar")
+        self.failIf(os.path.exists(replacement_path))
+        self.failIf(os.path.exists(backup_path))
+
+        # when replaced, replacement and backup all exist
+        fileutil.write(replaced_path,    "foo")
+        fileutil.write(replacement_path, "bar")
+        fileutil.write(backup_path,      "bak")
+        fileutil.replace_file(replaced_path, replacement_path, backup_path)
+        self.failUnlessEqual(fileutil.read(backup_path),   "foo")
+        self.failUnlessEqual(fileutil.read(replaced_path), "bar")
+        self.failIf(os.path.exists(replacement_path))
+
     def test_du(self):
         basedir = "util/FileUtil/test_du"
         fileutil.make_dirs(basedir)
@@ -567,6 +635,50 @@ class FileUtil(ReallyEqualMixin, unittest.TestCase):
         disk = fileutil.get_disk_stats('.', 2**128)
         self.failUnlessEqual(disk['avail'], 0)
 
+    def test_get_pathinfo(self):
+        basedir = "util/FileUtil/test_get_pathinfo"
+        fileutil.make_dirs(basedir)
+
+        # create a directory
+        self.mkdir(basedir, "a")
+        dirinfo = fileutil.get_pathinfo(basedir)
+        self.failUnlessTrue(dirinfo.isdir)
+        self.failUnlessTrue(dirinfo.exists)
+        self.failUnlessFalse(dirinfo.isfile)
+        self.failUnlessFalse(dirinfo.islink)
+
+        # create a file under the directory
+        f = os.path.join(basedir, "a", "1.txt")
+        self.touch(basedir, "a/1.txt", data="a"*10)
+        fileinfo = fileutil.get_pathinfo(f)
+        self.failUnlessTrue(fileinfo.isfile)
+        self.failUnlessTrue(fileinfo.exists)
+        self.failUnlessFalse(fileinfo.isdir)
+        self.failUnlessFalse(fileinfo.islink)
+        self.failUnlessEqual(fileinfo.size, 10)
+
+        # create a symlink under the directory a pointing to 1.txt
+        slname = os.path.join(basedir, "a", "linkto1.txt")
+        os.symlink(f, slname)
+        symlinkinfo = fileutil.get_pathinfo(slname)
+        self.failUnlessTrue(symlinkinfo.islink)
+        self.failUnlessTrue(symlinkinfo.exists)
+        self.failUnlessFalse(symlinkinfo.isfile)
+        self.failUnlessFalse(symlinkinfo.isdir)
+
+        # path at which nothing exists
+        dnename = os.path.join(basedir, "a", "doesnotexist")
+        now = time.time()
+        dneinfo = fileutil.get_pathinfo(dnename, now=now)
+        self.failUnlessFalse(dneinfo.exists)
+        self.failUnlessFalse(dneinfo.isfile)
+        self.failUnlessFalse(dneinfo.isdir)
+        self.failUnlessFalse(dneinfo.islink)
+        self.failUnlessEqual(dneinfo.size, None)
+        self.failUnlessEqual(dneinfo.mtime, now)
+        self.failUnlessEqual(dneinfo.ctime, now)
+
+
 class PollMixinTests(unittest.TestCase):
     def setUp(self):
         self.pm = pollmixin.PollMixin()
diff --git a/src/allmydata/util/deferredutil.py b/src/allmydata/util/deferredutil.py
index 989e85e8..9b37cb27 100644
--- a/src/allmydata/util/deferredutil.py
+++ b/src/allmydata/util/deferredutil.py
@@ -5,6 +5,7 @@ from foolscap.api import eventually, fireEventually
 from twisted.internet import defer, reactor
 
 from allmydata.util import log
+from allmydata.util.assertutil import _assert
 from allmydata.util.pollmixin import PollMixin
 
 
@@ -77,28 +78,35 @@ class HookMixin:
     I am a helper mixin that maintains a collection of named hooks, primarily
     for use in tests. Each hook is set to an unfired Deferred using 'set_hook',
     and can then be fired exactly once at the appropriate time by '_call_hook'.
+    If 'ignore_count' is given, that number of calls to '_call_hook' will be
+    ignored before firing the hook.
 
     I assume a '_hooks' attribute that should set by the class constructor to
     a dict mapping each valid hook name to None.
     """
-    def set_hook(self, name, d=None):
+    def set_hook(self, name, d=None, ignore_count=0):
         """
         Called by the hook observer (e.g. by a test).
         If d is not given, an unfired Deferred is created and returned.
         The hook must not already be set.
         """
+        self._log("set_hook %r, ignore_count=%r" % (name, ignore_count))
         if d is None:
             d = defer.Deferred()
-        assert self._hooks[name] is None, self._hooks[name]
-        assert isinstance(d, defer.Deferred), d
-        self._hooks[name] = d
+        _assert(ignore_count >= 0, ignore_count=ignore_count)
+        _assert(name in self._hooks, name=name)
+        _assert(self._hooks[name] is None, name=name, hook=self._hooks[name])
+        _assert(isinstance(d, defer.Deferred), d=d)
+
+        self._hooks[name] = (d, ignore_count)
         return d
 
     def _call_hook(self, res, name):
         """
-        Called to trigger the hook, with argument 'res'. This is a no-op if the
-        hook is unset. Otherwise, the hook will be unset, and then its Deferred
-        will be fired synchronously.
+        Called to trigger the hook, with argument 'res'. This is a no-op if
+        the hook is unset. If the hook's ignore_count is positive, it will be
+        decremented; if it was already zero, the hook will be unset, and then
+        its Deferred will be fired synchronously.
 
         The expected usage is "deferred.addBoth(self._call_hook, 'hookname')".
         This ensures that if 'res' is a failure, the hook will be errbacked,
@@ -106,13 +114,22 @@ class HookMixin:
         'res' is returned so that the current result or failure will be passed
         through.
         """
-        d = self._hooks[name]
-        if d is None:
+        hook = self._hooks[name]
+        if hook is None:
             return defer.succeed(None)
-        self._hooks[name] = None
-        _with_log(d.callback, res)
+
+        (d, ignore_count) = hook
+        self._log("call_hook %r, ignore_count=%r" % (name, ignore_count))
+        if ignore_count > 0:
+            self._hooks[name] = (d, ignore_count - 1)
+        else:
+            self._hooks[name] = None
+            _with_log(d.callback, res)
         return res
 
+    def _log(self, msg):
+        log.msg(msg, level=log.NOISY)
+
 
 def async_iterate(process, iterable, *extra_args, **kwargs):
     """
diff --git a/src/allmydata/util/encodingutil.py b/src/allmydata/util/encodingutil.py
index ae54afb9..a7a2bfef 100644
--- a/src/allmydata/util/encodingutil.py
+++ b/src/allmydata/util/encodingutil.py
@@ -6,7 +6,7 @@ unicode and back.
 import sys, os, re, locale
 from types import NoneType
 
-from allmydata.util.assertutil import precondition
+from allmydata.util.assertutil import precondition, _assert
 from twisted.python import usage
 from twisted.python.filepath import FilePath
 from allmydata.util import log
@@ -63,7 +63,11 @@ def _reload():
 
     is_unicode_platform = sys.platform in ["win32", "darwin"]
 
-    use_unicode_filepath = sys.platform == "win32" or hasattr(FilePath, '_asTextPath')
+    # Despite the Unicode-mode FilePath support added to Twisted in
+    # <https://twistedmatrix.com/trac/ticket/7805>, we can't yet use
+    # Unicode-mode FilePaths with INotify on non-Windows platforms
+    # due to <https://twistedmatrix.com/trac/ticket/7928>.
+    use_unicode_filepath = sys.platform == "win32"
 
 _reload()
 
@@ -249,6 +253,22 @@ def quote_local_unicode_path(path, quotemarks=True):
 
     return quote_output(path, quotemarks=quotemarks, quote_newlines=True)
 
+def quote_filepath(path, quotemarks=True):
+    return quote_local_unicode_path(unicode_from_filepath(path), quotemarks=quotemarks)
+
+def extend_filepath(fp, segments):
+    # We cannot use FilePath.preauthChild, because
+    # * it has the security flaw described in <https://twistedmatrix.com/trac/ticket/6527>;
+    # * it may return a FilePath in the wrong mode.
+
+    for segment in segments:
+        fp = fp.child(segment)
+
+    if isinstance(fp.path, unicode) and not use_unicode_filepath:
+        return FilePath(fp.path.encode(filesystem_encoding))
+    else:
+        return fp
+
 def to_filepath(path):
     precondition(isinstance(path, basestring), path=path)
 
@@ -257,15 +277,28 @@ def to_filepath(path):
 
     return FilePath(path)
 
+def _decode(s):
+    precondition(isinstance(s, basestring), s=s)
+
+    if isinstance(s, bytes):
+        return s.decode(filesystem_encoding)
+    else:
+        return s
+
 def unicode_from_filepath(fp):
     precondition(isinstance(fp, FilePath), fp=fp)
+    return _decode(fp.path)
 
-    path = fp.path
-    if isinstance(path, bytes):
-        path = path.decode(filesystem_encoding)
-
-    return path
+def unicode_segments_from(base_fp, ancestor_fp):
+    precondition(isinstance(base_fp, FilePath), base_fp=base_fp)
+    precondition(isinstance(ancestor_fp, FilePath), ancestor_fp=ancestor_fp)
 
+    if hasattr(FilePath, 'asTextMode'):
+        return base_fp.asTextMode().segmentsFrom(ancestor_fp.asTextMode())
+    else:
+        bpt, apt = (type(base_fp.path), type(ancestor_fp.path))
+        _assert(bpt == apt, bpt=bpt, apt=apt)
+        return map(_decode, base_fp.segmentsFrom(ancestor_fp))
 
 def unicode_platform():
     """
@@ -313,3 +346,6 @@ def listdir_unicode(path):
         return os.listdir(path)
     else:
         return listdir_unicode_fallback(path)
+
+def listdir_filepath(fp):
+    return listdir_unicode(unicode_from_filepath(fp))
diff --git a/src/allmydata/util/fileutil.py b/src/allmydata/util/fileutil.py
index 54e34484..8194fcd4 100644
--- a/src/allmydata/util/fileutil.py
+++ b/src/allmydata/util/fileutil.py
@@ -3,6 +3,8 @@ Futz with files like a pro.
 """
 
 import sys, exceptions, os, stat, tempfile, time, binascii
+from collections import namedtuple
+from errno import ENOENT
 
 from twisted.python import log
 
@@ -518,8 +520,7 @@ def get_available_space(whichdir, reserved_space):
 
 
 if sys.platform == "win32":
-    from ctypes import WINFUNCTYPE, windll, WinError
-    from ctypes.wintypes import BOOL, HANDLE, DWORD, LPCWSTR, LPVOID
+    from ctypes.wintypes import BOOL, HANDLE, DWORD, LPCWSTR, LPVOID, WinError, get_last_error
 
     # <http://msdn.microsoft.com/en-us/library/aa363858%28v=vs.85%29.aspx>
     CreateFileW = WINFUNCTYPE(HANDLE, LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE) \
@@ -560,3 +561,98 @@ else:
     def flush_volume(path):
         # use sync()?
         pass
+
+
+class ConflictError(Exception):
+    pass
+
+class UnableToUnlinkReplacementError(Exception):
+    pass
+
+def reraise(wrapper):
+    _, exc, tb = sys.exc_info()
+    wrapper_exc = wrapper("%s: %s" % (exc.__class__.__name__, exc))
+    raise wrapper_exc.__class__, wrapper_exc, tb
+
+if sys.platform == "win32":
+    from ctypes import WINFUNCTYPE, windll, WinError, get_last_error
+    from ctypes.wintypes import BOOL, DWORD, LPCWSTR, LPVOID
+
+    # <https://msdn.microsoft.com/en-us/library/windows/desktop/aa365512%28v=vs.85%29.aspx>
+    ReplaceFileW = WINFUNCTYPE(
+        BOOL,
+          LPCWSTR, LPCWSTR, LPCWSTR, DWORD, LPVOID, LPVOID,
+        use_last_error=True
+      )(("ReplaceFileW", windll.kernel32))
+
+    REPLACEFILE_IGNORE_MERGE_ERRORS = 0x00000002
+
+    def rename_no_overwrite(source_path, dest_path):
+        os.rename(source_path, dest_path)
+
+    def replace_file(replaced_path, replacement_path, backup_path):
+        precondition_abspath(replaced_path)
+        precondition_abspath(replacement_path)
+        precondition_abspath(backup_path)
+
+        r = ReplaceFileW(replaced_path, replacement_path, backup_path,
+                         REPLACEFILE_IGNORE_MERGE_ERRORS, None, None)
+        if r == 0:
+            # The UnableToUnlinkReplacementError case does not happen on Windows;
+            # all errors should be treated as signalling a conflict.
+            err = get_last_error()
+            raise ConflictError("WinError: %s" % (WinError(err)))
+else:
+    def rename_no_overwrite(source_path, dest_path):
+        # link will fail with EEXIST if there is already something at dest_path.
+        os.link(source_path, dest_path)
+        try:
+            os.unlink(source_path)
+        except EnvironmentError:
+            reraise(UnableToUnlinkReplacementError)
+
+    def replace_file(replaced_path, replacement_path, backup_path):
+        precondition_abspath(replaced_path)
+        precondition_abspath(replacement_path)
+        precondition_abspath(backup_path)
+
+        if not os.path.exists(replacement_path):
+            raise ConflictError("Replacement file not found: %r" % (replacement_path,))
+
+        try:
+            os.rename(replaced_path, backup_path)
+        except OSError as e:
+            if e.errno != ENOENT:
+                raise
+        try:
+            rename_no_overwrite(replacement_path, replaced_path)
+        except EnvironmentError:
+            reraise(ConflictError)
+
+PathInfo = namedtuple('PathInfo', 'isdir isfile islink exists size mtime ctime')
+
+def get_pathinfo(path_u, now=None):
+    try:
+        statinfo = os.lstat(path_u)
+        mode = statinfo.st_mode
+        return PathInfo(isdir =stat.S_ISDIR(mode),
+                        isfile=stat.S_ISREG(mode),
+                        islink=stat.S_ISLNK(mode),
+                        exists=True,
+                        size  =statinfo.st_size,
+                        mtime =statinfo.st_mtime,
+                        ctime =statinfo.st_ctime,
+                       )
+    except OSError as e:
+        if e.errno == ENOENT:
+            if now is None:
+                now = time.time()
+            return PathInfo(isdir =False,
+                            isfile=False,
+                            islink=False,
+                            exists=False,
+                            size  =None,
+                            mtime =now,
+                            ctime =now,
+                           )
+        raise
-- 
2.45.2