]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blobdiff - src/allmydata/frontends/sftpd.py
Ensure that path parameters to SFTPServer and FTPServer constructors are unicode...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / sftpd.py
index 0d459f6e6dda0f2319c48b30d74ba07a2a5f7e0d..a18af9c4e1350df96118b03a1ffad8adcb241719 100644 (file)
@@ -1,5 +1,6 @@
 
-import os, tempfile, heapq, binascii, traceback, array, stat, struct
+import heapq, traceback, array, stat, struct
+from types import NoneType
 from stat import S_IFREG, S_IFDIR
 from time import time, strftime, localtime
 
@@ -21,17 +22,19 @@ from twisted.python.failure import Failure
 from twisted.internet.interfaces import ITransport
 
 from twisted.internet import defer
-from twisted.internet.interfaces import IFinishableConsumer
+from twisted.internet.interfaces import IConsumer
 from foolscap.api import eventually
 from allmydata.util import deferredutil
 
+from allmydata.util.assertutil import _assert, precondition
 from allmydata.util.consumer import download_to_data
 from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
      NoSuchChildError, ChildOfWrongTypeError
 from allmydata.mutable.common import NotWriteableError
+from allmydata.mutable.publish import MutableFileHandle
 from allmydata.immutable.upload import FileHandle
-
-from pycryptopp.cipher.aes import AES
+from allmydata.dirnode import update_metadata
+from allmydata.util.fileutil import EncryptedTemporaryFile
 
 noisy = True
 use_foolscap_logging = True
@@ -76,6 +79,9 @@ def _to_sftp_time(t):
 
 
 def _convert_error(res, request):
+    """If res is not a Failure, return it, otherwise reraise the appropriate
+    SFTPError."""
+
     if not isinstance(res, Failure):
         logged_res = res
         if isinstance(res, str): logged_res = "<data of length %r>" % (len(res),)
@@ -86,11 +92,11 @@ def _convert_error(res, request):
     logmsg("RAISE %r %r" % (request, err.value), level=OPERATIONAL)
     try:
         if noisy: logmsg(traceback.format_exc(err.value), level=NOISY)
-    except:  # pragma: no cover
+    except Exception:  # pragma: no cover
         pass
 
     # The message argument to SFTPError must not reveal information that
-    # might compromise anonymity.
+    # might compromise anonymity, if we are running over an anonymous network.
 
     if err.check(SFTPError):
         # original raiser of SFTPError has responsibility to ensure anonymity
@@ -138,22 +144,23 @@ def _lsLine(name, attrs):
     st_gid = "tahoe"
     st_mtime = attrs.get("mtime", 0)
     st_mode = attrs["permissions"]
-    # TODO: check that clients are okay with this being a "?".
-    # (They should be because the longname is intended for human
-    # consumption.)
-    st_size = attrs.get("size", "?")
+
+    # Some clients won't tolerate '?' in the size field (#1337).
+    st_size = attrs.get("size", 0)
+
     # We don't know how many links there really are to this object.
     st_nlink = 1
 
-    # Based on <http://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
-    # We can't call the version in Twisted because we might have a version earlier than
-    # <http://twistedmatrix.com/trac/changeset/25412> (released in Twisted 8.2).
+    # Based on <https://twistedmatrix.com/trac/browser/trunk/twisted/conch/ls.py?rev=25412>.
+    # We previously could not call the version in Twisted because we needed the change
+    # <https://twistedmatrix.com/trac/changeset/25412> (released in Twisted v8.2).
+    # Since we now depend on Twisted v10.1, consider calling Twisted's version.
 
     mode = st_mode
     perms = array.array('c', '-'*10)
     ft = stat.S_IFMT(mode)
-    if   stat.S_ISDIR(ft):  perms[0] = 'd'
-    elif stat.S_ISREG(ft):  perms[0] = '-'
+    if   stat.S_ISDIR(ft): perms[0] = 'd'
+    elif stat.S_ISREG(ft): perms[0] = '-'
     else: perms[0] = '?'
     # user
     if mode&stat.S_IRUSR: perms[1] = 'r'
@@ -190,15 +197,17 @@ def _lsLine(name, attrs):
     return l
 
 
-def _is_readonly(parent_readonly, child):
+def _no_write(parent_readonly, child, metadata=None):
     """Whether child should be listed as having read-only permissions in parent."""
 
     if child.is_unknown():
         return True
     elif child.is_mutable():
         return child.is_readonly()
+    elif parent_readonly or IDirectoryNode.providedBy(child):
+        return True
     else:
-        return parent_readonly
+        return metadata is not None and metadata.get('no-write', False)
 
 
 def _populate_attrs(childnode, metadata, size=None):
@@ -208,6 +217,7 @@ def _populate_attrs(childnode, metadata, size=None):
     # bits, otherwise the client may refuse to open a directory.
     # Also, sshfs run as a non-root user requires files and directories
     # to be world-readable/writeable.
+    # It is important that we never set the executable bits on files.
     #
     # Directories and unknown nodes have no size, and SFTP doesn't
     # require us to make one up.
@@ -224,30 +234,25 @@ def _populate_attrs(childnode, metadata, size=None):
         if childnode and size is None:
             size = childnode.get_size()
         if size is not None:
-            assert isinstance(size, (int, long)) and not isinstance(size, bool), repr(size)
+            _assert(isinstance(size, (int, long)) and not isinstance(size, bool), size=size)
             attrs['size'] = size
         perms = S_IFREG | 0666
 
     if metadata:
-        assert 'readonly' in metadata, metadata
-        if metadata['readonly']:
+        if metadata.get('no-write', False):
             perms &= S_IFDIR | S_IFREG | 0555  # clear 'w' bits
 
-        # see webapi.txt for what these times mean
+        # See webapi.txt for what these times mean.
+        # We would prefer to omit atime, but SFTP version 3 can only
+        # accept mtime if atime is also set.
         if 'linkmotime' in metadata.get('tahoe', {}):
-            attrs['mtime'] = _to_sftp_time(metadata['tahoe']['linkmotime'])
+            attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['tahoe']['linkmotime'])
         elif 'mtime' in metadata:
-            # We would prefer to omit atime, but SFTP version 3 can only
-            # accept mtime if atime is also set.
-            attrs['mtime'] = _to_sftp_time(metadata['mtime'])
-            attrs['atime'] = attrs['mtime']
+            attrs['ctime'] = attrs['mtime'] = attrs['atime'] = _to_sftp_time(metadata['mtime'])
 
         if 'linkcrtime' in metadata.get('tahoe', {}):
             attrs['createtime'] = _to_sftp_time(metadata['tahoe']['linkcrtime'])
 
-        if 'ctime' in metadata:
-            attrs['ctime'] = _to_sftp_time(metadata['ctime'])
-
     attrs['permissions'] = perms
 
     # twisted.conch.ssh.filetransfer only implements SFTP version 3,
@@ -256,58 +261,40 @@ def _populate_attrs(childnode, metadata, size=None):
     return attrs
 
 
-class EncryptedTemporaryFile(PrefixingLogMixin):
-    # not implemented: next, readline, readlines, xreadlines, writelines
+def _attrs_to_metadata(attrs):
+    metadata = {}
 
-    def __init__(self):
-        PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
-        self.file = tempfile.TemporaryFile()
-        self.key = os.urandom(16)  # AES-128
-
-    def _crypt(self, offset, data):
-        # TODO: use random-access AES (pycryptopp ticket #18)
-        offset_big = offset // 16
-        offset_small = offset % 16
-        iv = binascii.unhexlify("%032x" % offset_big)
-        cipher = AES(self.key, iv=iv)
-        cipher.process("\x00"*offset_small)
-        return cipher.process(data)
+    for key in attrs:
+        if key == "mtime" or key == "ctime" or key == "createtime":
+            metadata[key] = long(attrs[key])
+        elif key.startswith("ext_"):
+            metadata[key] = str(attrs[key])
 
-    def close(self):
-        self.file.close()
+    perms = attrs.get('permissions', stat.S_IWUSR)
+    if not (perms & stat.S_IWUSR):
+        metadata['no-write'] = True
 
-    def flush(self):
-        self.file.flush()
+    return metadata
 
-    def seek(self, offset, whence=os.SEEK_SET):
-        if noisy: self.log(".seek(%r, %r)" % (offset, whence), level=NOISY)
-        self.file.seek(offset, whence)
 
-    def tell(self):
-        offset = self.file.tell()
-        if noisy: self.log(".tell() = %r" % (offset,), level=NOISY)
-        return offset
+def _direntry_for(filenode_or_parent, childname, filenode=None):
+    precondition(isinstance(childname, (unicode, NoneType)), childname=childname)
 
-    def read(self, size=-1):
-        if noisy: self.log(".read(%r)" % (size,), level=NOISY)
-        index = self.file.tell()
-        ciphertext = self.file.read(size)
-        plaintext = self._crypt(index, ciphertext)
-        return plaintext
+    if childname is None:
+        filenode_or_parent = filenode
 
-    def write(self, plaintext):
-        if noisy: self.log(".write(<data of length %r>)" % (len(plaintext),), level=NOISY)
-        index = self.file.tell()
-        ciphertext = self._crypt(index, plaintext)
-        self.file.write(ciphertext)
+    if filenode_or_parent:
+        rw_uri = filenode_or_parent.get_write_uri()
+        if rw_uri and childname:
+            return rw_uri + "/" + childname.encode('utf-8')
+        else:
+            return rw_uri
 
-    def truncate(self, newsize):
-        if noisy: self.log(".truncate(%r)" % (newsize,), level=NOISY)
-        self.file.truncate(newsize)
+    return None
 
 
 class OverwriteableFileConsumer(PrefixingLogMixin):
-    implements(IFinishableConsumer)
+    implements(IConsumer)
     """I act both as a consumer for the download of the original file contents, and as a
     wrapper for a temporary file that records the downloaded data and any overwrites.
     I use a priority queue to keep track of which regions of the file have been overwritten
@@ -334,12 +321,9 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
         self.milestones = []  # empty heap of (offset, d)
         self.overwrites = []  # empty heap of (start, end)
         self.is_closed = False
-        self.done = self.when_reached(download_size)  # adds a milestone
-        self.is_done = False
-        def _signal_done(ign):
-            if noisy: self.log("DONE", level=NOISY)
-            self.is_done = True
-        self.done.addCallback(_signal_done)
+
+        self.done = defer.Deferred()
+        self.done_status = None  # None -> not complete, Failure -> download failed, str -> download succeeded
         self.producer = None
 
     def get_file(self):
@@ -357,21 +341,25 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
             self.overwrite(self.current_size, "\x00" * (size - self.current_size))
         self.current_size = size
 
-        # invariant: self.download_size <= self.current_size
+        # make the invariant self.download_size <= self.current_size be true again
         if size < self.download_size:
             self.download_size = size
+
         if self.downloaded >= self.download_size:
-            self.finish()
+            self.download_done("size changed")
 
     def registerProducer(self, p, streaming):
         if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY)
+        if self.producer is not None:
+            raise RuntimeError("producer is already registered")
+
         self.producer = p
         if streaming:
             # call resumeProducing once to start things off
             p.resumeProducing()
         else:
             def _iterate():
-                if not self.is_done:
+                if self.done_status is None:
                     p.resumeProducing()
                     eventually(_iterate)
             _iterate()
@@ -438,13 +426,17 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
                 return
             if noisy: self.log("MILESTONE %r %r" % (next, d), level=NOISY)
             heapq.heappop(self.milestones)
-            eventually_callback(d)(None)
+            eventually_callback(d)("reached")
 
         if milestone >= self.download_size:
-            self.finish()
+            self.download_done("reached download size")
 
     def overwrite(self, offset, data):
         if noisy: self.log(".overwrite(%r, <data of length %r>)" % (offset, len(data)), level=NOISY)
+        if self.is_closed:
+            self.log("overwrite called on a closed OverwriteableFileConsumer", level=WEIRD)
+            raise SFTPError(FX_BAD_MESSAGE, "cannot write to a closed file handle")
+
         if offset > self.current_size:
             # Normally writing at an offset beyond the current end-of-file
             # would leave a hole that appears filled with zeroes. However, an
@@ -472,6 +464,15 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
         The caller must perform no more overwrites until the Deferred has fired."""
 
         if noisy: self.log(".read(%r, %r), current_size = %r" % (offset, length, self.current_size), level=NOISY)
+        if self.is_closed:
+            self.log("read called on a closed OverwriteableFileConsumer", level=WEIRD)
+            raise SFTPError(FX_BAD_MESSAGE, "cannot read from a closed file handle")
+
+        # Note that the overwrite method is synchronous. When a write request is processed
+        # (e.g. a writeChunk request on the async queue of GeneralSFTPFile), overwrite will
+        # be called and will update self.current_size if necessary before returning. Therefore,
+        # self.current_size will be up-to-date for a subsequent call to this read method, and
+        # so it is correct to do the check for a read past the end-of-file here.
         if offset >= self.current_size:
             def _eof(): raise EOFError("read past end of file")
             return defer.execute(_eof)
@@ -481,61 +482,84 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
             if noisy: self.log("truncating read to %r bytes" % (length,), level=NOISY)
 
         needed = min(offset + length, self.download_size)
-        d = self.when_reached(needed)
-        def _reached(ign):
+
+        # If we fail to reach the needed number of bytes, the read request will fail.
+        d = self.when_reached_or_failed(needed)
+        def _reached_in_read(res):
             # It is not necessarily the case that self.downloaded >= needed, because
             # the file might have been truncated (thus truncating the download) and
             # then extended.
 
-            assert self.current_size >= offset + length, (self.current_size, offset, length)
-            if noisy: self.log("self.f = %r" % (self.f,), level=NOISY)
+            _assert(self.current_size >= offset + length,
+                    current_size=self.current_size, offset=offset, length=length)
+            if noisy: self.log("_reached_in_read(%r), self.f = %r" % (res, self.f,), level=NOISY)
             self.f.seek(offset)
             return self.f.read(length)
-        d.addCallback(_reached)
+        d.addCallback(_reached_in_read)
         return d
 
-    def when_reached(self, index):
-        if noisy: self.log(".when_reached(%r)" % (index,), level=NOISY)
-        if index <= self.downloaded:  # already reached
-            if noisy: self.log("already reached %r" % (index,), level=NOISY)
-            return defer.succeed(None)
+    def when_reached_or_failed(self, index):
+        if noisy: self.log(".when_reached_or_failed(%r)" % (index,), level=NOISY)
+        def _reached(res):
+            if noisy: self.log("reached %r with result %r" % (index, res), level=NOISY)
+            return res
+
+        if self.done_status is not None:
+            return defer.execute(_reached, self.done_status)
+        if index <= self.downloaded:  # already reached successfully
+            if noisy: self.log("already reached %r successfully" % (index,), level=NOISY)
+            return defer.succeed("already reached successfully")
         d = defer.Deferred()
-        def _reached(ign):
-            if noisy: self.log("reached %r" % (index,), level=NOISY)
-            return ign
         d.addCallback(_reached)
         heapq.heappush(self.milestones, (index, d))
         return d
 
     def when_done(self):
-        return self.done
+        d = defer.Deferred()
+        self.done.addCallback(lambda ign: eventually_callback(d)(self.done_status))
+        return d
+
+    def download_done(self, res):
+        _assert(isinstance(res, (str, Failure)), res=res)
+        # Only the first call to download_done counts, but we log subsequent calls
+        # (multiple calls are normal).
+        if self.done_status is not None:
+            self.log("IGNORING extra call to download_done with result %r; previous result was %r"
+                     % (res, self.done_status), level=OPERATIONAL)
+            return
+
+        self.log("DONE with result %r" % (res,), level=OPERATIONAL)
+
+        # We avoid errbacking self.done so that we are not left with an 'Unhandled error in Deferred'
+        # in case when_done() is never called. Instead we stash the failure in self.done_status,
+        # from where the callback added in when_done() can retrieve it.
+        self.done_status = res
+        eventually_callback(self.done)(None)
 
-    def finish(self):
         while len(self.milestones) > 0:
             (next, d) = self.milestones[0]
-            if noisy: self.log("MILESTONE FINISH %r %r" % (next, d), level=NOISY)
+            if noisy: self.log("MILESTONE FINISH %r %r %r" % (next, d, res), level=NOISY)
             heapq.heappop(self.milestones)
             # The callback means that the milestone has been reached if
             # it is ever going to be. Note that the file may have been
             # truncated to before the milestone.
-            eventually_callback(d)(None)
-
-        # FIXME: causes spurious failures
-        #self.unregisterProducer()
+            eventually_callback(d)(res)
 
     def close(self):
-        self.is_closed = True
-        self.finish()
         if not self.is_closed:
+            self.is_closed = True
             try:
                 self.f.close()
-            except BaseException as e:
+            except Exception, e:
                 self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
+        self.download_done("closed")
+        return self.done_status
 
     def unregisterProducer(self):
-        if self.producer:
-            self.producer.stopProducing()
-            self.producer = None
+        # This will happen just before our client calls download_done, which will tell
+        # us the outcome of the download; we don't know the outcome at this point.
+        self.producer = None
+        self.log("producer unregistered", level=NOISY)
 
 
 SIZE_THRESHOLD = 1000
@@ -545,13 +569,15 @@ class ShortReadOnlySFTPFile(PrefixingLogMixin):
     implements(ISFTPFile)
     """I represent a file handle to a particular file on an SFTP connection.
     I am used only for short immutable files opened in read-only mode.
-    The file contents are downloaded to memory when I am created."""
+    When I am created, the file contents start to be downloaded to memory.
+    self.async is used to delay read requests until the download has finished."""
 
     def __init__(self, userpath, filenode, metadata):
         PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
         if noisy: self.log(".__init__(%r, %r, %r)" % (userpath, filenode, metadata), level=NOISY)
 
-        assert IFileNode.providedBy(filenode), filenode
+        precondition(isinstance(userpath, str) and IFileNode.providedBy(filenode),
+                     userpath=userpath, filenode=filenode)
         self.filenode = filenode
         self.metadata = metadata
         self.async = download_to_data(filenode)
@@ -579,9 +605,9 @@ class ShortReadOnlySFTPFile(PrefixingLogMixin):
             # i.e. we respond with an EOF error iff offset is already at EOF.
 
             if offset >= len(data):
-                eventually_errback(d)(SFTPError(FX_EOF, "read at or past end of file"))
+                eventually_errback(d)(Failure(SFTPError(FX_EOF, "read at or past end of file")))
             else:
-                eventually_callback(d)(data[offset:min(offset+length, len(data))])
+                eventually_callback(d)(data[offset:offset+length])  # truncated if offset+length > len(data)
             return data
         self.async.addCallbacks(_read, eventually_errback(d))
         d.addBoth(_convert_error, request)
@@ -634,6 +660,7 @@ class GeneralSFTPFile(PrefixingLogMixin):
         if noisy: self.log(".__init__(%r, %r = %r, %r, <convergence censored>)" %
                            (userpath, flags, _repr_flags(flags), close_notify), level=NOISY)
 
+        precondition(isinstance(userpath, str), userpath=userpath)
         self.userpath = userpath
         self.flags = flags
         self.close_notify = close_notify
@@ -656,6 +683,10 @@ class GeneralSFTPFile(PrefixingLogMixin):
         self.log(".open(parent=%r, childname=%r, filenode=%r, metadata=%r)" %
                  (parent, childname, filenode, metadata), level=OPERATIONAL)
 
+        precondition(isinstance(childname, (unicode, NoneType)), childname=childname)
+        precondition(filenode is None or IFileNode.providedBy(filenode), filenode=filenode)
+        precondition(not self.closed, sftpfile=self)
+
         # If the file has been renamed, the new (parent, childname) takes precedence.
         if self.parent is None:
             self.parent = parent
@@ -664,42 +695,47 @@ class GeneralSFTPFile(PrefixingLogMixin):
         self.filenode = filenode
         self.metadata = metadata
 
-        if not self.closed:
-            tempfile_maker = EncryptedTemporaryFile
+        tempfile_maker = EncryptedTemporaryFile
 
-            if (self.flags & FXF_TRUNC) or not filenode:
-                # We're either truncating or creating the file, so we don't need the old contents.
-                self.consumer = OverwriteableFileConsumer(0, tempfile_maker)
-                self.consumer.finish()
-            else:
-                assert IFileNode.providedBy(filenode), filenode
-
-                # TODO: use download interface described in #993 when implemented.
-                if filenode.is_mutable():
-                    self.async.addCallback(lambda ign: filenode.download_best_version())
-                    def _downloaded(data):
-                        self.consumer = OverwriteableFileConsumer(len(data), tempfile_maker)
-                        self.consumer.write(data)
-                        self.consumer.finish()
-                        return None
-                    self.async.addCallback(_downloaded)
-                else:
-                    download_size = filenode.get_size()
-                    assert download_size is not None, "download_size is None"
-                    self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker)
-                    def _read(ign):
-                        if noisy: self.log("_read immutable", level=NOISY)
-                        filenode.read(self.consumer, 0, None)
-                    self.async.addCallback(_read)
+        if (self.flags & FXF_TRUNC) or not filenode:
+            # We're either truncating or creating the file, so we don't need the old contents.
+            self.consumer = OverwriteableFileConsumer(0, tempfile_maker)
+            self.consumer.download_done("download not needed")
+        else:
+            self.async.addCallback(lambda ignored: filenode.get_best_readable_version())
+
+            def _read(version):
+                if noisy: self.log("_read", level=NOISY)
+                download_size = version.get_size()
+                _assert(download_size is not None)
+
+                self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker)
+
+                d = version.read(self.consumer, 0, None)
+                def _finished(res):
+                    if not isinstance(res, Failure):
+                        res = "download finished"
+                    self.consumer.download_done(res)
+                d.addBoth(_finished)
+                # It is correct to drop d here.
+            self.async.addCallback(_read)
 
         eventually_callback(self.async)(None)
 
         if noisy: self.log("open done", level=NOISY)
         return self
 
+    def get_userpath(self):
+        return self.userpath
+
+    def get_direntry(self):
+        return _direntry_for(self.parent, self.childname)
+
     def rename(self, new_userpath, new_parent, new_childname):
         self.log(".rename(%r, %r, %r)" % (new_userpath, new_parent, new_childname), level=OPERATIONAL)
 
+        precondition(isinstance(new_userpath, str) and isinstance(new_childname, unicode),
+                     new_userpath=new_userpath, new_childname=new_childname)
         self.userpath = new_userpath
         self.parent = new_parent
         self.childname = new_childname
@@ -709,11 +745,16 @@ class GeneralSFTPFile(PrefixingLogMixin):
 
         self.abandoned = True
 
-    def sync(self):
+    def sync(self, ign=None):
+        # The ign argument allows some_file.sync to be used as a callback.
         self.log(".sync()", level=OPERATIONAL)
 
         d = defer.Deferred()
         self.async.addBoth(eventually_callback(d))
+        def _done(res):
+            if noisy: self.log("_done(%r) in .sync()" % (res,), level=NOISY)
+            return res
+        d.addBoth(_done)
         return d
 
     def readChunk(self, offset, length):
@@ -732,7 +773,7 @@ class GeneralSFTPFile(PrefixingLogMixin):
         def _read(ign):
             if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY)
             d2 = self.consumer.read(offset, length)
-            d2.addCallbacks(eventually_callback(d), eventually_errback(d))
+            d2.addBoth(eventually_callback(d))
             # It is correct to drop d2 here.
             return None
         self.async.addCallbacks(_read, eventually_errback(d))
@@ -776,6 +817,24 @@ class GeneralSFTPFile(PrefixingLogMixin):
         # don't addErrback to self.async, just allow subsequent async ops to fail.
         return defer.succeed(None)
 
+    def _do_close(self, res, d=None):
+        if noisy: self.log("_do_close(%r)" % (res,), level=NOISY)
+        status = None
+        if self.consumer:
+            status = self.consumer.close()
+
+        # We must close_notify before re-firing self.async.
+        if self.close_notify:
+            self.close_notify(self.userpath, self.parent, self.childname, self)
+
+        if not isinstance(res, Failure) and isinstance(status, Failure):
+            res = status
+
+        if d:
+            eventually_callback(d)(res)
+        elif isinstance(res, Failure):
+            self.log("suppressing %r" % (res,), level=OPERATIONAL)
+
     def close(self):
         request = ".close()"
         self.log(request, level=OPERATIONAL)
@@ -787,61 +846,56 @@ class GeneralSFTPFile(PrefixingLogMixin):
         self.closed = True
 
         if not (self.flags & (FXF_WRITE | FXF_CREAT)):
-            def _readonly_close():
-                if self.consumer:
-                    self.consumer.close()
-            return defer.execute(_readonly_close)
+            # We never fail a close of a handle opened only for reading, even if the file
+            # failed to download. (We could not do so deterministically, because it would
+            # depend on whether we reached the point of failure before abandoning the
+            # download.) Any reads that depended on file content that could not be downloaded
+            # will have failed. It is important that we don't close the consumer until
+            # previous read operations have completed.
+            self.async.addBoth(self._do_close)
+            return defer.succeed(None)
 
         # We must capture the abandoned, parent, and childname variables synchronously
         # at the close call. This is needed by the correctness arguments in the comments
         # for _abandon_any_heisenfiles and _rename_heisenfiles.
+        # Note that the file must have been opened before it can be closed.
         abandoned = self.abandoned
         parent = self.parent
         childname = self.childname
-        
+
         # has_changed is set when writeChunk is called, not when the write occurs, so
         # it is correct to optimize out the commit if it is False at the close call.
         has_changed = self.has_changed
 
-        def _committed(res):
-            if noisy: self.log("_committed(%r)" % (res,), level=NOISY)
-
-            self.consumer.close()
-
-            # We must close_notify before re-firing self.async.
-            if self.close_notify:
-                self.close_notify(self.userpath, self.parent, self.childname, self)
-            return res
-
-        def _close(ign):
+        def _commit(ign):
             d2 = self.consumer.when_done()
             if self.filenode and self.filenode.is_mutable():
-                self.log("update mutable file %r childname=%r" % (self.filenode, self.childname,), level=OPERATIONAL)
-                d2.addCallback(lambda ign: self.consumer.get_current_size())
-                d2.addCallback(lambda size: self.consumer.read(0, size))
-                d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
+                self.log("update mutable file %r childname=%r metadata=%r"
+                         % (self.filenode, childname, self.metadata), level=OPERATIONAL)
+                if self.metadata.get('no-write', False) and not self.filenode.is_readonly():
+                    _assert(parent and childname, parent=parent, childname=childname, metadata=self.metadata)
+                    d2.addCallback(lambda ign: parent.set_metadata_for(childname, self.metadata))
+
+                d2.addCallback(lambda ign: self.filenode.overwrite(MutableFileHandle(self.consumer.get_file())))
             else:
                 def _add_file(ign):
                     self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
                     u = FileHandle(self.consumer.get_file(), self.convergence)
-                    return parent.add_file(childname, u)
+                    return parent.add_file(childname, u, metadata=self.metadata)
                 d2.addCallback(_add_file)
-
-            d2.addBoth(_committed)
             return d2
 
-        d = defer.Deferred()
-
         # If the file has been abandoned, we don't want the close operation to get "stuck",
-        # even if self.async fails to re-fire. Doing the close independently of self.async
-        # in that case ensures that dropping an ssh connection is sufficient to abandon
+        # even if self.async fails to re-fire. Completing the close independently of self.async
+        # in that case should ensure that dropping an ssh connection is sufficient to abandon
         # any heisenfiles that were not explicitly closed in that connection.
         if abandoned or not has_changed:
-            d.addCallback(_committed)
+            d = defer.succeed(None)
+            self.async.addBoth(self._do_close)
         else:
-            self.async.addCallback(_close)
-
-        self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
+            d = defer.Deferred()
+            self.async.addCallback(_commit)
+            self.async.addBoth(self._do_close, d)
         d.addBoth(_convert_error, request)
         return d
 
@@ -854,11 +908,13 @@ class GeneralSFTPFile(PrefixingLogMixin):
             return defer.execute(_closed)
 
         # Optimization for read-only handles, when we already know the metadata.
-        if not(self.flags & (FXF_WRITE | FXF_CREAT)) and self.metadata and self.filenode and not self.filenode.is_mutable():
+        if not (self.flags & (FXF_WRITE | FXF_CREAT)) and self.metadata and self.filenode and not self.filenode.is_mutable():
             return defer.succeed(_populate_attrs(self.filenode, self.metadata))
 
         d = defer.Deferred()
         def _get(ign):
+            if noisy: self.log("_get(%r) in %r, filenode = %r, metadata = %r" % (ign, request, self.filenode, self.metadata), level=NOISY)
+
             # self.filenode might be None, but that's ok.
             attrs = _populate_attrs(self.filenode, self.metadata, size=self.consumer.get_current_size())
             eventually_callback(d)(attrs)
@@ -867,8 +923,8 @@ class GeneralSFTPFile(PrefixingLogMixin):
         d.addBoth(_convert_error, request)
         return d
 
-    def setAttrs(self, attrs):
-        request = ".setAttrs(attrs) %r" % (attrs,)
+    def setAttrs(self, attrs, only_if_at=None):
+        request = ".setAttrs(%r, only_if_at=%r)" % (attrs, only_if_at)
         self.log(request, level=OPERATIONAL)
 
         if not (self.flags & FXF_WRITE):
@@ -879,20 +935,29 @@ class GeneralSFTPFile(PrefixingLogMixin):
             def _closed(): raise SFTPError(FX_BAD_MESSAGE, "cannot set attributes for a closed file handle")
             return defer.execute(_closed)
 
-        if not "size" in attrs:
-            return defer.succeed(None)
-
-        size = attrs["size"]
-        if not isinstance(size, (int, long)) or size < 0:
+        size = attrs.get("size", None)
+        if size is not None and (not isinstance(size, (int, long)) or size < 0):
             def _bad(): raise SFTPError(FX_BAD_MESSAGE, "new size is not a valid nonnegative integer")
             return defer.execute(_bad)
 
         d = defer.Deferred()
-        def _resize(ign):
-            self.consumer.set_current_size(size)
+        def _set(ign):
+            if noisy: self.log("_set(%r) in %r" % (ign, request), level=NOISY)
+            current_direntry = _direntry_for(self.parent, self.childname, self.filenode)
+            if only_if_at and only_if_at != current_direntry:
+                if noisy: self.log("not setting attributes: current_direntry=%r in %r" %
+                                   (current_direntry, request), level=NOISY)
+                return None
+
+            now = time()
+            self.metadata = update_metadata(self.metadata, _attrs_to_metadata(attrs), now)
+            if size is not None:
+                # TODO: should we refuse to truncate a file opened with FXF_APPEND?
+                # <http://allmydata.org/trac/tahoe-lafs/ticket/1037#comment:20>
+                self.consumer.set_current_size(size)
             eventually_callback(d)(None)
             return None
-        self.async.addCallbacks(_resize, eventually_errback(d))
+        self.async.addCallbacks(_set, eventually_errback(d))
         d.addBoth(_convert_error, request)
         return d
 
@@ -914,14 +979,18 @@ class Reason:
 
 # A "heisenfile" is a file that has been opened with write flags
 # (FXF_WRITE and/or FXF_CREAT) and not yet close-notified.
-# 'all_heisenfiles' maps from a direntry string to
-# (list_of_GeneralSFTPFile, open_time_utc).
+# 'all_heisenfiles' maps from a direntry string to a list of
+# GeneralSFTPFile.
+#
 # A direntry string is parent_write_uri + "/" + childname_utf8 for
 # an immutable file, or file_write_uri for a mutable file.
 # Updates to this dict are single-threaded.
 
 all_heisenfiles = {}
 
+def _reload():
+    global all_heisenfiles
+    all_heisenfiles = {}
 
 class SFTPUserHandler(ConchUser, PrefixingLogMixin):
     implements(ISFTPServer)
@@ -958,26 +1027,30 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
             for f in files:
                 f.abandon()
 
-    def _add_heisenfiles_by_path(self, userpath, files):
-        if noisy: self.log("._add_heisenfiles_by_path(%r, %r)" % (userpath, files), level=NOISY)
+    def _add_heisenfile_by_path(self, file):
+        self.log("._add_heisenfile_by_path(%r)" % (file,), level=OPERATIONAL)
 
+        userpath = file.get_userpath()
         if userpath in self._heisenfiles:
-            self._heisenfiles[userpath] += files
+            self._heisenfiles[userpath] += [file]
         else:
-            self._heisenfiles[userpath] = files
+            self._heisenfiles[userpath] = [file]
 
-    def _add_heisenfiles_by_direntry(self, direntry, files_to_add):
-        if noisy: self.log("._add_heisenfiles_by_direntry(%r, %r)" % (direntry, files_to_add), level=NOISY)
+    def _add_heisenfile_by_direntry(self, file):
+        self.log("._add_heisenfile_by_direntry(%r)" % (file,), level=OPERATIONAL)
 
+        direntry = file.get_direntry()
         if direntry:
             if direntry in all_heisenfiles:
-                (old_files, opentime) = all_heisenfiles[direntry]
-                all_heisenfiles[direntry] = (old_files + files_to_add, opentime)
+                all_heisenfiles[direntry] += [file]
             else:
-                all_heisenfiles[direntry] = (files_to_add, time())
+                all_heisenfiles[direntry] = [file]
 
     def _abandon_any_heisenfiles(self, userpath, direntry):
-        if noisy: self.log("._abandon_any_heisenfiles(%r, %r)" % (userpath, direntry), level=NOISY)
+        request = "._abandon_any_heisenfiles(%r, %r)" % (userpath, direntry)
+        self.log(request, level=OPERATIONAL)
+
+        precondition(isinstance(userpath, str), userpath=userpath)
 
         # First we synchronously mark all heisenfiles matching the userpath or direntry
         # as abandoned, and remove them from the two heisenfile dicts. Then we .sync()
@@ -999,27 +1072,38 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
 
         files = []
         if direntry in all_heisenfiles:
-            (files, opentime) = all_heisenfiles[direntry]
+            files = all_heisenfiles[direntry]
             del all_heisenfiles[direntry]
         if userpath in self._heisenfiles:
             files += self._heisenfiles[userpath]
             del self._heisenfiles[userpath]
 
+        if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
+
         for f in files:
             f.abandon()
 
         d = defer.succeed(None)
         for f in files:
-            d.addBoth(lambda ign: f.sync())
+            d.addBoth(f.sync)
 
-        d.addBoth(lambda ign: len(files) > 0)
+        def _done(ign):
+            self.log("done %r" % (request,), level=OPERATIONAL)
+            return len(files) > 0
+        d.addBoth(_done)
         return d
 
     def _rename_heisenfiles(self, from_userpath, from_parent, from_childname,
                             to_userpath, to_parent, to_childname, overwrite=True):
-        if noisy: self.log("._rename_heisenfiles(%r, %r, %r, %r, %r, %r, overwrite=%r)" %
-                           (from_userpath, from_parent, from_childname,
-                            to_userpath, to_parent, to_childname, overwrite), level=NOISY)
+        request = ("._rename_heisenfiles(%r, %r, %r, %r, %r, %r, overwrite=%r)" %
+                   (from_userpath, from_parent, from_childname, to_userpath, to_parent, to_childname, overwrite))
+        self.log(request, level=OPERATIONAL)
+
+        precondition((isinstance(from_userpath, str) and isinstance(from_childname, unicode) and
+                      isinstance(to_userpath, str) and isinstance(to_childname, unicode)),
+                     from_userpath=from_userpath, from_childname=from_childname, to_userpath=to_userpath, to_childname=to_childname)
+
+        if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
 
         # First we synchronously rename all heisenfiles matching the userpath or direntry.
         # Then we .sync() each file that we renamed.
@@ -1047,48 +1131,95 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         # is False and there were already heisenfiles at the destination userpath or
         # direntry, we return a Deferred that fails with SFTPError(FX_PERMISSION_DENIED).
 
-        from_direntry = self._direntry_for(from_parent, from_childname)
-        to_direntry = self._direntry_for(to_parent, to_childname)
+        from_direntry = _direntry_for(from_parent, from_childname)
+        to_direntry = _direntry_for(to_parent, to_childname)
+
+        if noisy: self.log("from_direntry = %r, to_direntry = %r, len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
+                           (from_direntry, to_direntry, len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
 
         if not overwrite and (to_userpath in self._heisenfiles or to_direntry in all_heisenfiles):
             def _existing(): raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
+            if noisy: self.log("existing", level=NOISY)
             return defer.execute(_existing)
 
         from_files = []
         if from_direntry in all_heisenfiles:
-            (from_files, opentime) = all_heisenfiles[from_direntry]
+            from_files = all_heisenfiles[from_direntry]
             del all_heisenfiles[from_direntry]
         if from_userpath in self._heisenfiles:
             from_files += self._heisenfiles[from_userpath]
             del self._heisenfiles[from_userpath]
 
-        self._add_heisenfiles_by_direntry(to_direntry, from_files)
-        self._add_heisenfiles_by_path(to_userpath, from_files)
+        if noisy: self.log("from_files = %r in %r" % (from_files, request), level=NOISY)
 
         for f in from_files:
             f.rename(to_userpath, to_parent, to_childname)
+            self._add_heisenfile_by_path(f)
+            self._add_heisenfile_by_direntry(f)
 
         d = defer.succeed(None)
         for f in from_files:
-            d.addBoth(lambda ign: f.sync())
+            d.addBoth(f.sync)
+
+        def _done(ign):
+            if noisy: self.log("done: len(all_heisenfiles) = %r, len(self._heisenfiles) = %r in %r" %
+                               (len(all_heisenfiles), len(self._heisenfiles), request), level=NOISY)
+            return len(from_files) > 0
+        d.addBoth(_done)
+        return d
+
+    def _update_attrs_for_heisenfiles(self, userpath, direntry, attrs):
+        request = "._update_attrs_for_heisenfiles(%r, %r, %r)" % (userpath, direntry, attrs)
+        self.log(request, level=OPERATIONAL)
+
+        _assert(isinstance(userpath, str) and isinstance(direntry, str),
+                userpath=userpath, direntry=direntry)
+
+        files = []
+        if direntry in all_heisenfiles:
+            files = all_heisenfiles[direntry]
+        if userpath in self._heisenfiles:
+            files += self._heisenfiles[userpath]
 
-        d.addBoth(lambda ign: len(from_files) > 0)
+        if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
+
+        # We set the metadata for all heisenfiles at this path or direntry.
+        # Since a direntry includes a write URI, we must have authority to
+        # change the metadata of heisenfiles found in the all_heisenfiles dict.
+        # However that's not necessarily the case for heisenfiles found by
+        # path. Therefore we tell the setAttrs method of each file to only
+        # perform the update if the file is at the correct direntry.
+
+        d = defer.succeed(None)
+        for f in files:
+            d.addBoth(f.setAttrs, attrs, only_if_at=direntry)
+
+        def _done(ign):
+            self.log("done %r" % (request,), level=OPERATIONAL)
+            # TODO: this should not return True if only_if_at caused all files to be skipped.
+            return len(files) > 0
+        d.addBoth(_done)
         return d
 
     def _sync_heisenfiles(self, userpath, direntry, ignore=None):
         request = "._sync_heisenfiles(%r, %r, ignore=%r)" % (userpath, direntry, ignore)
         self.log(request, level=OPERATIONAL)
 
+        _assert(isinstance(userpath, str) and isinstance(direntry, (str, NoneType)),
+                userpath=userpath, direntry=direntry)
+
         files = []
         if direntry in all_heisenfiles:
-            (files, opentime) = all_heisenfiles[direntry]
+            files = all_heisenfiles[direntry]
         if userpath in self._heisenfiles:
             files += self._heisenfiles[userpath]
 
+        if noisy: self.log("files = %r in %r" % (files, request), level=NOISY)
+
         d = defer.succeed(None)
         for f in files:
             if f is not ignore:
-                d.addBoth(lambda ign: f.sync())
+                d.addBoth(f.sync)
 
         def _done(ign):
             self.log("done %r" % (request,), level=OPERATIONAL)
@@ -1097,14 +1228,17 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         return d
 
     def _remove_heisenfile(self, userpath, parent, childname, file_to_remove):
-        if noisy: self.log("._remove_file(%r, %r, %r, %r)" % (userpath, parent, childname, file_to_remove), level=NOISY)
+        if noisy: self.log("._remove_heisenfile(%r, %r, %r, %r)" % (userpath, parent, childname, file_to_remove), level=NOISY)
+
+        _assert(isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)),
+                userpath=userpath, childname=childname)
 
-        direntry = self._direntry_for(parent, childname)
+        direntry = _direntry_for(parent, childname)
         if direntry in all_heisenfiles:
-            (all_old_files, opentime) = all_heisenfiles[direntry]
+            all_old_files = all_heisenfiles[direntry]
             all_new_files = [f for f in all_old_files if f is not file_to_remove]
             if len(all_new_files) > 0:
-                all_heisenfiles[direntry] = (all_new_files, opentime)
+                all_heisenfiles[direntry] = all_new_files
             else:
                 del all_heisenfiles[direntry]
 
@@ -1116,28 +1250,19 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
             else:
                 del self._heisenfiles[userpath]
 
-    def _direntry_for(self, filenode_or_parent, childname=None):
-        if filenode_or_parent:
-            rw_uri = filenode_or_parent.get_write_uri()
-            if rw_uri and childname:
-                return rw_uri + "/" + childname.encode('utf-8')
-            else:
-                return rw_uri
-
-        return None
+        if noisy: self.log("all_heisenfiles = %r\nself._heisenfiles = %r" % (all_heisenfiles, self._heisenfiles), level=NOISY)
 
     def _make_file(self, existing_file, userpath, flags, parent=None, childname=None, filenode=None, metadata=None):
         if noisy: self.log("._make_file(%r, %r, %r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
                            (existing_file, userpath, flags, _repr_flags(flags), parent, childname, filenode, metadata),
                            level=NOISY)
 
-        assert metadata is None or 'readonly' in metadata, metadata
+        _assert((isinstance(userpath, str) and isinstance(childname, (unicode, NoneType)) and
+                (metadata is None or 'no-write' in metadata)),
+                userpath=userpath, childname=childname, metadata=metadata)
 
         writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
-        if childname:
-            direntry = self._direntry_for(parent, childname)
-        else:
-            direntry = self._direntry_for(filenode)
+        direntry = _direntry_for(parent, childname, filenode)
 
         d = self._sync_heisenfiles(userpath, direntry, ignore=existing_file)
 
@@ -1150,14 +1275,15 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
 
             d.addCallback(lambda ign: existing_file or GeneralSFTPFile(userpath, flags, close_notify, self._convergence))
             def _got_file(file):
+                file.open(parent=parent, childname=childname, filenode=filenode, metadata=metadata)
                 if writing:
-                    self._add_heisenfiles_by_direntry(direntry, [file])
-                return file.open(parent=parent, childname=childname, filenode=filenode, metadata=metadata)
+                    self._add_heisenfile_by_direntry(file)
+                return file
             d.addCallback(_got_file)
         return d
 
-    def openFile(self, pathstring, flags, attrs):
-        request = ".openFile(%r, %r = %r, %r)" % (pathstring, flags, _repr_flags(flags), attrs)
+    def openFile(self, pathstring, flags, attrs, delay=None):
+        request = ".openFile(%r, %r = %r, %r, delay=%r)" % (pathstring, flags, _repr_flags(flags), attrs, delay)
         self.log(request, level=OPERATIONAL)
 
         # This is used for both reading and writing.
@@ -1192,11 +1318,13 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
 
         if flags & (FXF_WRITE | FXF_CREAT):
             file = GeneralSFTPFile(userpath, flags, self._remove_heisenfile, self._convergence)
-            self._add_heisenfiles_by_path(userpath, [file])
+            self._add_heisenfile_by_path(file)
         else:
             # We haven't decided which file implementation to use yet.
             file = None
 
+        desired_metadata = _attrs_to_metadata(attrs)
+
         # Now there are two major cases:
         #
         #  1. The path is specified as /uri/FILECAP, with no parent directory.
@@ -1227,11 +1355,12 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         # Note that the permission checks below are for more precise error reporting on
         # the open call; later operations would fail even if we did not make these checks.
 
-        d = self._get_root(path)
+        d = delay or defer.succeed(None)
+        d.addCallback(lambda ign: self._get_root(path))
         def _got_root( (root, path) ):
             if root.is_unknown():
                 raise SFTPError(FX_PERMISSION_DENIED,
-                                "cannot open an unknown cap (or child of an unknown directory). "
+                                "cannot open an unknown cap (or child of an unknown object). "
                                 "Upgrading the gateway to a later Tahoe-LAFS version may help")
             if not path:
                 # case 1
@@ -1249,18 +1378,29 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
                 # The file does not need to be added to all_heisenfiles, because it is not
                 # associated with a directory entry that needs to be updated.
 
-                return self._make_file(file, userpath, flags, filenode=root)
+                metadata = update_metadata(None, desired_metadata, time())
+
+                # We have to decide what to pass for the 'parent_readonly' argument to _no_write,
+                # given that we don't actually have a parent. This only affects the permissions
+                # reported by a getAttrs on this file handle in the case of an immutable file.
+                # We choose 'parent_readonly=True' since that will cause the permissions to be
+                # reported as r--r--r--, which is appropriate because an immutable file can't be
+                # written via this path.
+
+                metadata['no-write'] = _no_write(True, root)
+                return self._make_file(file, userpath, flags, filenode=root, metadata=metadata)
             else:
                 # case 2
                 childname = path[-1]
-                if noisy: self.log("case 2: root = %r, childname = %r, path[:-1] = %r" %
-                                   (root, childname, path[:-1]), level=NOISY)
+
+                if noisy: self.log("case 2: root = %r, childname = %r, desired_metadata = %r, path[:-1] = %r" %
+                                   (root, childname, desired_metadata, path[:-1]), level=NOISY)
                 d2 = root.get_child_at_path(path[:-1])
                 def _got_parent(parent):
                     if noisy: self.log("_got_parent(%r)" % (parent,), level=NOISY)
                     if parent.is_unknown():
                         raise SFTPError(FX_PERMISSION_DENIED,
-                                        "cannot open an unknown cap (or child of an unknown directory). "
+                                        "cannot open a child of an unknown object. "
                                         "Upgrading the gateway to a later Tahoe-LAFS version may help")
 
                     parent_readonly = parent.is_readonly()
@@ -1283,7 +1423,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
                         zero_length_lit = "URI:LIT:"
                         if noisy: self.log("%r.set_uri(%r, None, readcap=%r, overwrite=False)" %
                                            (parent, zero_length_lit, childname), level=NOISY)
-                        d3.addCallback(lambda ign: parent.set_uri(childname, None, readcap=zero_length_lit, overwrite=False))
+                        d3.addCallback(lambda ign: parent.set_uri(childname, None, readcap=zero_length_lit,
+                                                                  metadata=desired_metadata, overwrite=False))
                         def _seturi_done(child):
                             if noisy: self.log("%r.get_metadata_for(%r)" % (parent, childname), level=NOISY)
                             d4 = parent.get_metadata_for(childname)
@@ -1294,8 +1435,14 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
                         if noisy: self.log("%r.get_child_and_metadata(%r)" % (parent, childname), level=NOISY)
                         d3.addCallback(lambda ign: parent.get_child_and_metadata(childname))
 
-                    def _got_child( (filenode, metadata) ):
-                        if noisy: self.log("_got_child( (%r, %r) )" % (filenode, metadata), level=NOISY)
+                    def _got_child( (filenode, current_metadata) ):
+                        if noisy: self.log("_got_child( (%r, %r) )" % (filenode, current_metadata), level=NOISY)
+
+                        metadata = update_metadata(current_metadata, desired_metadata, time())
+
+                        # Ignore the permissions of the desired_metadata in an open call. The permissions
+                        # can only be set by setAttrs.
+                        metadata['no-write'] = _no_write(parent_readonly, filenode, current_metadata)
 
                         if filenode.is_unknown():
                             raise SFTPError(FX_PERMISSION_DENIED,
@@ -1304,14 +1451,10 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
                         if not IFileNode.providedBy(filenode):
                             raise SFTPError(FX_PERMISSION_DENIED,
                                             "cannot open a directory as if it were a file")
-                        if (flags & FXF_WRITE) and filenode.is_mutable() and filenode.is_readonly():
+                        if (flags & FXF_WRITE) and metadata['no-write']:
                             raise SFTPError(FX_PERMISSION_DENIED,
-                                            "cannot open a read-only mutable file for writing")
-                        if (flags & FXF_WRITE) and parent_readonly:
-                            raise SFTPError(FX_PERMISSION_DENIED,
-                                            "cannot open a file for writing when the parent directory is read-only")
+                                            "cannot open a non-writeable file for writing")
 
-                        metadata['readonly'] = _is_readonly(parent_readonly, filenode)
                         return self._make_file(file, userpath, flags, parent=parent, childname=childname,
                                                filenode=filenode, metadata=metadata)
                     def _no_child(f):
@@ -1372,17 +1515,17 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
             # For the standard SSH_FXP_RENAME operation, overwrite=False.
             # We also support the posix-rename@openssh.com extension, which uses overwrite=True.
 
-            d2 = defer.fail(NoSuchChildError())
+            d2 = defer.succeed(None)
             if not overwrite:
                 d2.addCallback(lambda ign: to_parent.get(to_childname))
-            def _expect_fail(res):
-                if not isinstance(res, Failure):
-                    raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
+                def _expect_fail(res):
+                    if not isinstance(res, Failure):
+                        raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
 
-                # It is OK if we fail for errors other than NoSuchChildError, since that probably
-                # indicates some problem accessing the destination directory.
-                res.trap(NoSuchChildError)
-            d2.addBoth(_expect_fail)
+                    # It is OK if we fail for errors other than NoSuchChildError, since that probably
+                    # indicates some problem accessing the destination directory.
+                    res.trap(NoSuchChildError)
+                d2.addBoth(_expect_fail)
 
             # If there are heisenfiles to be written at the 'from' direntry, then ensure
             # they will now be written at the 'to' direntry instead.
@@ -1418,7 +1561,11 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         self.log(request, level=OPERATIONAL)
 
         path = self._path_from_string(pathstring)
-        metadata = self._attrs_to_metadata(attrs)
+        metadata = _attrs_to_metadata(attrs)
+        if 'no-write' in metadata:
+            def _denied(): raise SFTPError(FX_PERMISSION_DENIED, "cannot create a directory that is initially read-only")
+            return defer.execute(_denied)
+
         d = self._get_root(path)
         d.addCallback(lambda (root, path):
                       self._get_or_create_directories(root, path, metadata))
@@ -1467,7 +1614,7 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
             if childname is None:
                 raise SFTPError(FX_NO_SUCH_FILE, "cannot remove an object specified by URI")
 
-            direntry = self._direntry_for(parent, childname)
+            direntry = _direntry_for(parent, childname)
             d2 = defer.succeed(False)
             if not must_be_directory:
                 d2.addCallback(lambda ign: self._abandon_any_heisenfiles(userpath, direntry))
@@ -1508,7 +1655,7 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
                 results = []
                 for filename, (child, metadata) in children.iteritems():
                     # The file size may be cached or absent.
-                    metadata['readonly'] = _is_readonly(parent_readonly, child)
+                    metadata['no-write'] = _no_write(parent_readonly, child, metadata)
                     attrs = _populate_attrs(child, metadata)
                     filename_utf8 = filename.encode('utf-8')
                     longname = _lsLine(filename_utf8, attrs)
@@ -1529,52 +1676,47 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         # reported as the update time of the best version. But that
         # information isn't currently stored in mutable shares, I think.
 
-        # Some clients will incorrectly try to get the attributes
-        # of a file immediately after opening it, before it has been put
-        # into the all_heisenfiles table. This is a race condition bug in
-        # the client, but we probably need to handle it anyway.
-
         path = self._path_from_string(pathstring)
         userpath = self._path_to_utf8(path)
         d = self._get_parent_or_node(path)
         def _got_parent_or_node( (parent_or_node, childname) ):
             if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
 
-            direntry = self._direntry_for(parent_or_node, childname)
+            # Some clients will incorrectly try to get the attributes
+            # of a file immediately after opening it, before it has been put
+            # into the all_heisenfiles table. This is a race condition bug in
+            # the client, but we handle it anyway by calling .sync() on all
+            # files matching either the path or the direntry.
+
+            direntry = _direntry_for(parent_or_node, childname)
             d2 = self._sync_heisenfiles(userpath, direntry)
 
             if childname is None:
                 node = parent_or_node
                 d2.addCallback(lambda ign: node.get_current_size())
                 d2.addCallback(lambda size:
-                               _populate_attrs(node, {'readonly': node.is_unknown() or node.is_readonly()}, size=size))
+                               _populate_attrs(node, {'no-write': node.is_unknown() or node.is_readonly()}, size=size))
             else:
                 parent = parent_or_node
                 d2.addCallback(lambda ign: parent.get_child_and_metadata_at_path([childname]))
                 def _got( (child, metadata) ):
                     if noisy: self.log("_got( (%r, %r) )" % (child, metadata), level=NOISY)
-                    assert IDirectoryNode.providedBy(parent), parent
-                    metadata['readonly'] = _is_readonly(parent.is_readonly(), child)
+                    _assert(IDirectoryNode.providedBy(parent), parent=parent)
+                    metadata['no-write'] = _no_write(parent.is_readonly(), child, metadata)
                     d3 = child.get_current_size()
                     d3.addCallback(lambda size: _populate_attrs(child, metadata, size=size))
                     return d3
                 def _nosuch(err):
                     if noisy: self.log("_nosuch(%r)" % (err,), level=NOISY)
                     err.trap(NoSuchChildError)
-                    direntry = self._direntry_for(parent, childname)
                     if noisy: self.log("checking open files:\nself._heisenfiles = %r\nall_heisenfiles = %r\ndirentry=%r" %
                                        (self._heisenfiles, all_heisenfiles, direntry), level=NOISY)
                     if direntry in all_heisenfiles:
-                        (files, opentime) = all_heisenfiles[direntry]
-                        sftptime = _to_sftp_time(opentime)
-                        # A file that has been opened for writing necessarily has permissions rw-rw-rw-.
-                        return {'permissions': S_IFREG | 0666,
-                                'size': 0,
-                                'createtime': sftptime,
-                                'ctime': sftptime,
-                                'mtime': sftptime,
-                                'atime': sftptime,
-                               }
+                        files = all_heisenfiles[direntry]
+                        if len(files) == 0:  # pragma: no cover
+                            return err
+                        # use the heisenfile that was most recently opened
+                        return files[-1].getAttrs()
                     return err
                 d2.addCallbacks(_got, _nosuch)
             return d2
@@ -1583,14 +1725,47 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         return d
 
     def setAttrs(self, pathstring, attrs):
-        self.log(".setAttrs(%r, %r)" % (pathstring, attrs), level=OPERATIONAL)
+        request = ".setAttrs(%r, %r)" % (pathstring, attrs)
+        self.log(request, level=OPERATIONAL)
 
         if "size" in attrs:
             # this would require us to download and re-upload the truncated/extended
             # file contents
             def _unsupported(): raise SFTPError(FX_OP_UNSUPPORTED, "setAttrs wth size attribute unsupported")
             return defer.execute(_unsupported)
-        return defer.succeed(None)
+
+        path = self._path_from_string(pathstring)
+        userpath = self._path_to_utf8(path)
+        d = self._get_parent_or_node(path)
+        def _got_parent_or_node( (parent_or_node, childname) ):
+            if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
+
+            direntry = _direntry_for(parent_or_node, childname)
+            d2 = self._update_attrs_for_heisenfiles(userpath, direntry, attrs)
+
+            def _update(updated_heisenfiles):
+                if childname is None:
+                    if updated_heisenfiles:
+                        return None
+                    raise SFTPError(FX_NO_SUCH_FILE, userpath)
+                else:
+                    desired_metadata = _attrs_to_metadata(attrs)
+                    if noisy: self.log("desired_metadata = %r" % (desired_metadata,), level=NOISY)
+
+                    d3 = parent_or_node.set_metadata_for(childname, desired_metadata)
+                    def _nosuch(err):
+                        if updated_heisenfiles:
+                            err.trap(NoSuchChildError)
+                        else:
+                            return err
+                    d3.addErrback(_nosuch)
+                    return d3
+            d2.addCallback(_update)
+            d2.addCallback(lambda ign: None)
+            return d2
+        d.addCallback(_got_parent_or_node)
+        d.addBoth(_convert_error, request)
+        return d
 
     def readLink(self, pathstring):
         self.log(".readLink(%r)" % (pathstring,), level=OPERATIONAL)
@@ -1616,6 +1791,7 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         if extensionName == 'posix-rename@openssh.com':
             def _bad(): raise SFTPError(FX_BAD_MESSAGE, "could not parse posix-rename@openssh.com request")
 
+            if 4 > len(extensionData): return defer.execute(_bad)
             (fromPathLen,) = struct.unpack('>L', extensionData[0:4])
             if 8 + fromPathLen > len(extensionData): return defer.execute(_bad)
 
@@ -1635,6 +1811,7 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
             return d
 
         if extensionName == 'statvfs@openssh.com' or extensionName == 'fstatvfs@openssh.com':
+            # f_bsize and f_frsize should be the same to avoid a bug in 'df'
             return defer.succeed(struct.pack('>11Q',
                 1024,         # uint64  f_bsize     /* file system block size */
                 1024,         # uint64  f_frsize    /* fundamental fs block size */
@@ -1664,6 +1841,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
     def _path_from_string(self, pathstring):
         if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY)
 
+        _assert(isinstance(pathstring, str), pathstring=pathstring)
+
         # The home directory is the root directory.
         pathstring = pathstring.strip("/")
         if pathstring == "" or pathstring == ".":
@@ -1713,60 +1892,77 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         d.addCallback(_got_root)
         return d
 
-    def _attrs_to_metadata(self, attrs):
-        metadata = {}
 
-        for key in attrs:
-            if key == "mtime" or key == "ctime" or key == "createtime":
-                metadata[key] = long(attrs[key])
-            elif key.startswith("ext_"):
-                metadata[key] = str(attrs[key])
+class FakeTransport:
+    implements(ITransport)
+    def write(self, data):
+        logmsg("FakeTransport.write(<data of length %r>)" % (len(data),), level=NOISY)
 
-        return metadata
+    def writeSequence(self, data):
+        logmsg("FakeTransport.writeSequence(...)", level=NOISY)
 
+    def loseConnection(self):
+        logmsg("FakeTransport.loseConnection()", level=NOISY)
 
-class SFTPUser(ConchUser, PrefixingLogMixin):
-    implements(ISession)
-    def __init__(self, check_abort, client, rootnode, username, convergence):
-        ConchUser.__init__(self)
-        PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
+    # getPeer and getHost can just raise errors, since we don't know what to return
 
-        self.channelLookup["session"] = session.SSHSession
-        self.subsystemLookup["sftp"] = FileTransferServer
 
-        self.check_abort = check_abort
-        self.client = client
-        self.root = rootnode
-        self.username = username
-        self.convergence = convergence
+class ShellSession(PrefixingLogMixin):
+    implements(ISession)
+    def __init__(self, userHandler):
+        PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
+        if noisy: self.log(".__init__(%r)" % (userHandler), level=NOISY)
 
     def getPty(self, terminal, windowSize, attrs):
         self.log(".getPty(%r, %r, %r)" % (terminal, windowSize, attrs), level=OPERATIONAL)
-        raise NotImplementedError
 
     def openShell(self, protocol):
         self.log(".openShell(%r)" % (protocol,), level=OPERATIONAL)
-        raise NotImplementedError
+        if hasattr(protocol, 'transport') and protocol.transport is None:
+            protocol.transport = FakeTransport()  # work around Twisted bug
+
+        return self._unsupported(protocol)
 
     def execCommand(self, protocol, cmd):
         self.log(".execCommand(%r, %r)" % (protocol, cmd), level=OPERATIONAL)
-        raise NotImplementedError
+        if hasattr(protocol, 'transport') and protocol.transport is None:
+            protocol.transport = FakeTransport()  # work around Twisted bug
+
+        d = defer.succeed(None)
+        if cmd == "df -P -k /":
+            d.addCallback(lambda ign: protocol.write(
+                          "Filesystem         1024-blocks      Used Available Capacity Mounted on\r\n"
+                          "tahoe                628318530 314159265 314159265      50% /\r\n"))
+            d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessDone(None))))
+        else:
+            d.addCallback(lambda ign: self._unsupported(protocol))
+        return d
+
+    def _unsupported(self, protocol):
+        d = defer.succeed(None)
+        d.addCallback(lambda ign: protocol.errReceived(
+                      "This server supports only the SFTP protocol. It does not support SCP,\r\n"
+                      "interactive shell sessions, or commands other than one needed by sshfs.\r\n"))
+        d.addCallback(lambda ign: protocol.processEnded(Reason(ProcessTerminated(exitCode=1))))
+        return d
 
     def windowChanged(self, newWindowSize):
         self.log(".windowChanged(%r)" % (newWindowSize,), level=OPERATIONAL)
 
-    def eofReceived():
+    def eofReceived(self):
         self.log(".eofReceived()", level=OPERATIONAL)
 
     def closed(self):
         self.log(".closed()", level=OPERATIONAL)
 
 
-# if you have an SFTPUser, and you want something that provides ISFTPServer,
-# then you get SFTPHandler(user)
-components.registerAdapter(SFTPHandler, SFTPUser, ISFTPServer)
+# If you have an SFTPUserHandler and want something that provides ISession, you get
+# ShellSession(userHandler).
+# We use adaptation because this must be a different object to the SFTPUserHandler.
+components.registerAdapter(ShellSession, SFTPUserHandler, ISession)
+
 
-from auth import AccountURLChecker, AccountFileChecker, NeedRootcapLookupScheme
+from allmydata.frontends.auth import AccountURLChecker, AccountFileChecker, NeedRootcapLookupScheme
 
 class Dispatcher:
     implements(portal.IRealm)
@@ -1774,7 +1970,7 @@ class Dispatcher:
         self._client = client
 
     def requestAvatar(self, avatarID, mind, interface):
-        assert interface == IConchUser, interface
+        _assert(interface == IConchUser, interface=interface)
         rootnode = self._client.create_node_from_uri(avatarID.rootcap)
         handler = SFTPUserHandler(self._client, rootnode, avatarID.username)
         return (interface, handler, handler.logout)
@@ -1783,6 +1979,9 @@ class Dispatcher:
 class SFTPServer(service.MultiService):
     def __init__(self, client, accountfile, accounturl,
                  sftp_portstr, pubkey_file, privkey_file):
+        precondition(isinstance(accountfile, unicode), accountfile)
+        precondition(isinstance(pubkey_file, unicode), pubkey_file)
+        precondition(isinstance(privkey_file, unicode), privkey_file)
         service.MultiService.__init__(self)
 
         r = Dispatcher(client)
@@ -1798,8 +1997,8 @@ class SFTPServer(service.MultiService):
             # we could leave this anonymous, with just the /uri/CAP form
             raise NeedRootcapLookupScheme("must provide an account file or URL")
 
-        pubkey = keys.Key.fromFile(pubkey_file)
-        privkey = keys.Key.fromFile(privkey_file)
+        pubkey = keys.Key.fromFile(pubkey_file.encode(get_filesystem_encoding()))
+        privkey = keys.Key.fromFile(privkey_file.encode(get_filesystem_encoding()))
         class SSHFactory(factory.SSHFactory):
             publicKeys = {pubkey.sshType(): pubkey}
             privateKeys = {privkey.sshType(): privkey}