]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
SFTP: cater to clients that assume a file is created as soon as they have made an...
authordavid-sarah <david-sarah@jacaranda.org>
Sat, 29 May 2010 04:52:53 +0000 (21:52 -0700)
committerdavid-sarah <david-sarah@jacaranda.org>
Sat, 29 May 2010 04:52:53 +0000 (21:52 -0700)
src/allmydata/frontends/sftpd.py
src/allmydata/test/test_sftp.py

index 45ff5e00159490bdc508345c62cbf62a95606d5e..e0516d81864c5a1d411de697c1a84b3c134729aa 100644 (file)
@@ -27,7 +27,7 @@ from allmydata.util import deferredutil
 
 from allmydata.util.consumer import download_to_data
 from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
-     NoSuchChildError
+     NoSuchChildError, ChildOfWrongTypeError
 from allmydata.mutable.common import NotWriteableError
 from allmydata.immutable.upload import FileHandle
 
@@ -43,7 +43,7 @@ warnings.filterwarnings("ignore", category=DeprecationWarning,
 noisy = True
 use_foolscap_logging = True
 
-from allmydata.util.log import NOISY, OPERATIONAL, \
+from allmydata.util.log import NOISY, OPERATIONAL, WEIRD, \
     msg as _msg, err as _err, PrefixingLogMixin as _PrefixingLogMixin
 
 if use_foolscap_logging:
@@ -54,10 +54,10 @@ else:  # pragma: no cover
     def logerr(s, level=None):
         print s
     class PrefixingLogMixin:
-        def __init__(self, facility=None):
-            pass
+        def __init__(self, facility=None, prefix=''):
+            self.prefix = prefix
         def log(self, s, level=None):
-            print s
+            print "%r %s" % (self.prefix, s)
 
 
 def eventually_callback(d):
@@ -105,7 +105,7 @@ def _convert_error(res, request):
     if err.check(NoSuchChildError):
         childname = _utf8(err.value.args[0])
         raise SFTPError(FX_NO_SUCH_FILE, childname)
-    if err.check(NotWriteableError):
+    if err.check(NotWriteableError) or err.check(ChildOfWrongTypeError):
         msg = _utf8(err.value.args[0])
         raise SFTPError(FX_PERMISSION_DENIED, msg)
     if err.check(ExistingChildError):
@@ -160,12 +160,7 @@ def _lsLine(name, attrs):
     perms = array.array('c', '-'*10)
     ft = stat.S_IFMT(mode)
     if   stat.S_ISDIR(ft):  perms[0] = 'd'
-    elif stat.S_ISCHR(ft):  perms[0] = 'c'
-    elif stat.S_ISBLK(ft):  perms[0] = 'b'
     elif stat.S_ISREG(ft):  perms[0] = '-'
-    elif stat.S_ISFIFO(ft): perms[0] = 'f'
-    elif stat.S_ISLNK(ft):  perms[0] = 'l'
-    elif stat.S_ISSOCK(ft): perms[0] = 's'
     else: perms[0] = '?'
     # user
     if mode&stat.S_IRUSR: perms[1] = 'r'
@@ -277,7 +272,7 @@ class EncryptedTemporaryFile(PrefixingLogMixin):
         self.key = os.urandom(16)  # AES-128
 
     def _crypt(self, offset, data):
-        # FIXME: use random-access AES (pycryptopp ticket #18)
+        # TODO: use random-access AES (pycryptopp ticket #18)
         offset_big = offset // 16
         offset_small = offset % 16
         iv = binascii.unhexlify("%032x" % offset_big)
@@ -329,7 +324,7 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
 
     The temporary file reflects the contents of the file that I represent, except that:
      - regions that have neither been downloaded nor overwritten, if present,
-       contain zeroes.
+       contain garbage.
      - the temporary file may be shorter than the represented file (it is never longer).
        The latter's current size is stored in self.current_size.
 
@@ -365,7 +360,11 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
                            (size, self.current_size, self.downloaded), level=NOISY)
         if size < self.current_size or size < self.downloaded:
             self.f.truncate(size)
+        if size > self.current_size:
+            self.overwrite(self.current_size, "\x00" * (size - self.current_size))
         self.current_size = size
+
+        # invariant: self.download_size <= self.current_size
         if size < self.download_size:
             self.download_size = size
         if self.downloaded >= self.download_size:
@@ -453,7 +452,7 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
 
     def overwrite(self, offset, data):
         if noisy: self.log(".overwrite(%r, <data of length %r>)" % (offset, len(data)), level=NOISY)
-        if offset > self.download_size and offset > self.current_size:
+        if offset > self.current_size:
             # Normally writing at an offset beyond the current end-of-file
             # would leave a hole that appears filled with zeroes. However, an
             # EncryptedTemporaryFile doesn't behave like that (if there is a
@@ -462,14 +461,17 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
             # the gap between the current EOF and the offset.
 
             self.f.seek(self.current_size)
-            self.f.write("\x00" * (offset - self.current_size))            
+            self.f.write("\x00" * (offset - self.current_size))
+            start = self.current_size
         else:
             self.f.seek(offset)
+            start = offset
+
         self.f.write(data)
         end = offset + len(data)
         self.current_size = max(self.current_size, end)
         if end > self.downloaded:
-            heapq.heappush(self.overwrites, (offset, end))
+            heapq.heappush(self.overwrites, (start, end))
 
     def read(self, offset, length):
         """When the data has been read, callback the Deferred that we return with this data.
@@ -531,7 +533,10 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
     def close(self):
         self.is_closed = True
         self.finish()
-        self.f.close()
+        try:
+            self.f.close()
+        except EnvironmentError as e:
+            self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
 
     def unregisterProducer(self):
         if self.producer:
@@ -548,9 +553,9 @@ class ShortReadOnlySFTPFile(PrefixingLogMixin):
     I am used only for short immutable files opened in read-only mode.
     The file contents are downloaded to memory when I am created."""
 
-    def __init__(self, filenode, metadata):
-        PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
-        if noisy: self.log(".__init__(%r, %r)" % (filenode, metadata), level=NOISY)
+    def __init__(self, userpath, filenode, metadata):
+        PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
+        if noisy: self.log(".__init__(%r, %r, %r)" % (userpath, filenode, metadata), level=NOISY)
 
         assert IFileNode.providedBy(filenode), filenode
         self.filenode = filenode
@@ -625,69 +630,90 @@ class GeneralSFTPFile(PrefixingLogMixin):
     storing the file contents. In order to allow write requests to be satisfied
     immediately, there is effectively a FIFO queue between requests made to this
     file handle, and requests to my OverwriteableFileConsumer. This queue is
-    implemented by the callback chain of self.async."""
+    implemented by the callback chain of self.async.
 
-    def __init__(self, close_notify, flags, convergence, parent=None, childname=None, filenode=None, metadata=None):
-        PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
-        if noisy: self.log(".__init__(%r, %r, <convergence censored>, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
-                           (close_notify, flags, parent, childname, filenode, metadata), level=NOISY)
+    When first constructed, I am in an 'unopened' state that causes most
+    operations to be delayed until 'open' is called."""
 
-        self.close_notify = close_notify
+    def __init__(self, userpath, flags, close_notify, convergence):
+        PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
+        if noisy: self.log(".__init__(%r, %r = %r, %r, <convergence censored>)" %
+                           (userpath, flags, _repr_flags(flags), close_notify), level=NOISY)
+
+        self.userpath = userpath
         self.flags = flags
+        self.close_notify = close_notify
         self.convergence = convergence
-        self.parent = parent
-        self.childname = childname
-        self.filenode = filenode
-        self.metadata = metadata
-        self.async = defer.succeed(None)
+        self.async = defer.Deferred()
         # Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created.
         self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL)
         self.closed = False
-        self.added = False
-        self.removed = False
-        
+        self.abandoned = False
+        self.parent = None
+        self.childname = None
+        self.filenode = None
+        self.metadata = None
+
         # self.consumer should only be relied on in callbacks for self.async, since it might
         # not be set before then.
         self.consumer = None
-        tempfile_maker = EncryptedTemporaryFile
 
-        if (flags & FXF_TRUNC) or not filenode:
-            # We're either truncating or creating the file, so we don't need the old contents.
-            self.consumer = OverwriteableFileConsumer(0, tempfile_maker)
-            self.consumer.finish()
-        else:
-            assert IFileNode.providedBy(filenode), filenode
-
-            # TODO: use download interface described in #993 when implemented.
-            if filenode.is_mutable():
-                self.async.addCallback(lambda ign: filenode.download_best_version())
-                def _downloaded(data):
-                    self.consumer = OverwriteableFileConsumer(len(data), tempfile_maker)
-                    self.consumer.write(data)
-                    self.consumer.finish()
-                    return None
-                self.async.addCallback(_downloaded)
+    def open(self, parent=None, childname=None, filenode=None, metadata=None):
+        self.log(".open(parent=%r, childname=%r, filenode=%r, metadata=%r)" %
+                 (parent, childname, filenode, metadata), level=OPERATIONAL)
+
+        # If the file has been renamed, the new (parent, childname) takes precedence.
+        if self.parent is None:
+            self.parent = parent
+        if self.childname is None:
+            self.childname = childname
+        self.filenode = filenode
+        self.metadata = metadata
+
+        if not self.closed:
+            tempfile_maker = EncryptedTemporaryFile
+
+            if (self.flags & FXF_TRUNC) or not filenode:
+                # We're either truncating or creating the file, so we don't need the old contents.
+                self.consumer = OverwriteableFileConsumer(0, tempfile_maker)
+                self.consumer.finish()
             else:
-                download_size = filenode.get_size()
-                assert download_size is not None, "download_size is None"
-                self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker)
-                def _read(ign):
-                    if noisy: self.log("_read immutable", level=NOISY)
-                    filenode.read(self.consumer, 0, None)
-                self.async.addCallback(_read)
+                assert IFileNode.providedBy(filenode), filenode
+
+                # TODO: use download interface described in #993 when implemented.
+                if filenode.is_mutable():
+                    self.async.addCallback(lambda ign: filenode.download_best_version())
+                    def _downloaded(data):
+                        self.consumer = OverwriteableFileConsumer(len(data), tempfile_maker)
+                        self.consumer.write(data)
+                        self.consumer.finish()
+                        return None
+                    self.async.addCallback(_downloaded)
+                else:
+                    download_size = filenode.get_size()
+                    assert download_size is not None, "download_size is None"
+                    self.consumer = OverwriteableFileConsumer(download_size, tempfile_maker)
+                    def _read(ign):
+                        if noisy: self.log("_read immutable", level=NOISY)
+                        filenode.read(self.consumer, 0, None)
+                    self.async.addCallback(_read)
+
+        eventually_callback(self.async)(None)
 
-        if noisy: self.log("__init__ done", level=NOISY)
+        if noisy: self.log("open done", level=NOISY)
+        return self
 
-    def rename(self, new_parent, new_childname):
-        self.log(".rename(%r, %r)" % (new_parent, new_childname), level=OPERATIONAL)
+    def rename(self, new_userpath, new_parent, new_childname):
+        self.log(".rename(%r, %r, %r)" % (new_userpath, new_parent, new_childname), level=OPERATIONAL)
 
+        self.userpath = new_userpath
         self.parent = new_parent
         self.childname = new_childname
 
-    def remove(self):
-        self.log(".remove()", level=OPERATIONAL)
+    def abandon(self):
+        self.log(".abandon()", level=OPERATIONAL)
 
-        self.removed = True
+        self.abandoned = True
 
     def sync(self):
         self.log(".sync()", level=OPERATIONAL)
@@ -712,7 +738,6 @@ class GeneralSFTPFile(PrefixingLogMixin):
         def _read(ign):
             if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY)
             d2 = self.consumer.read(offset, length)
-            d2.addErrback(_convert_error, request)
             d2.addCallbacks(eventually_callback(d), eventually_errback(d))
             # It is correct to drop d2 here.
             return None
@@ -768,11 +793,21 @@ class GeneralSFTPFile(PrefixingLogMixin):
         self.closed = True
 
         if not (self.flags & (FXF_WRITE | FXF_CREAT)):
-            return defer.execute(self.consumer.close)
-
+            def _readonly_close():
+                if self.consumer:
+                    self.consumer.close()
+            return defer.execute(_readonly_close)
+
+        # We must capture the abandoned, parent, and childname variables synchronously
+        # at the close call. This is needed by the correctness arguments in the comments
+        # for _abandon_any_heisenfiles and _rename_heisenfiles.
+        abandoned = self.abandoned
+        parent = self.parent
+        childname = self.childname
+        
         def _close(ign):
             d2 = defer.succeed(None)
-            if self.has_changed and not self.removed:
+            if self.has_changed and not abandoned:
                 d2.addCallback(lambda ign: self.consumer.when_done())
                 if self.filenode and self.filenode.is_mutable():
                     self.log("update mutable file %r childname=%r" % (self.filenode, self.childname,), level=OPERATIONAL)
@@ -780,33 +815,28 @@ class GeneralSFTPFile(PrefixingLogMixin):
                     d2.addCallback(lambda size: self.consumer.read(0, size))
                     d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
                 else:
-                    self.added = True
                     def _add_file(ign):
-                        self.log("_add_file childname=%r" % (self.childname,), level=OPERATIONAL)
+                        self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
                         u = FileHandle(self.consumer.get_file(), self.convergence)
-                        return self.parent.add_file(self.childname, u)
+                        return parent.add_file(childname, u)
                     d2.addCallback(_add_file)
 
-            d2.addCallback(lambda ign: self.consumer.close())
+            def _committed(res):
+                if noisy: self.log("_committed(%r)" % (res,), level=NOISY)
+
+                self.consumer.close()
+
+                # We must close_notify before re-firing self.async.
+                if self.close_notify:
+                    self.close_notify(self.userpath, self.parent, self.childname, self)
+                return res
+            d2.addBoth(_committed)
             return d2
+
         self.async.addCallback(_close)
 
         d = defer.Deferred()
         self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
-
-        def _closed(res):
-            if noisy: self.log("_closed(%r)" % (res,), level=NOISY)
-            self.close_notify(self.parent, self.childname, self)
-
-            # It is possible for there to be a race between adding the file and removing it.
-            if self.added and self.removed:
-                self.log("oops, we added %r but must now remove it" % (self.childname,), level=OPERATIONAL)
-                d2 = self.parent.delete(self.childname)
-                d2.addBoth(_convert_error, request)  # just for logging
-                d2.addBoth(lambda ign: res)
-                return d2
-            return res
-        d.addBoth(_closed)
         d.addBoth(_convert_error, request)
         return d
 
@@ -877,18 +907,22 @@ class Reason:
         self.value = value
 
 
-# For each immutable file that has been opened with write flags
-# (FXF_WRITE and/or FXF_CREAT) and is still open, this maps from
-# parent_write_uri + "/" + childname_utf8, to (list_of_ISFTPFile, open_time_utc).
+# A "heisenfile" is a file that has been opened with write flags
+# (FXF_WRITE and/or FXF_CREAT) and not yet close-notified.
+# 'all_heisenfiles' maps from a direntry string to
+# (list_of_GeneralSFTPFile, open_time_utc).
+# A direntry string is parent_write_uri + "/" + childname_utf8 for
+# an immutable file, or file_write_uri for a mutable file.
 # Updates to this dict are single-threaded.
 
-all_open_files = {}
+all_heisenfiles = {}
+
 
 class SFTPUserHandler(ConchUser, PrefixingLogMixin):
     implements(ISFTPServer)
     def __init__(self, client, rootnode, username):
         ConchUser.__init__(self)
-        PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
+        PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=username)
         if noisy: self.log(".__init__(%r, %r, %r)" % (client, rootnode, username), level=NOISY)
 
         self.channelLookup["session"] = session.SSHSession
@@ -898,7 +932,9 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         self._root = rootnode
         self._username = username
         self._convergence = client.convergence
-        self._open_files = {}  # files created by this user handler and still open
+
+        # maps from UTF-8 paths for this user, to files written and still open
+        self._heisenfiles = {}
 
     def gotVersion(self, otherVersion, extData):
         self.log(".gotVersion(%r, %r)" % (otherVersion, extData), level=OPERATIONAL)
@@ -910,87 +946,164 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
                 'fstatvfs@openssh.com': '2',
                }
 
-    def _add_open_files(self, direntry, files_to_add):
-        if noisy: self.log("._add_open_files(%r, %r)" % (direntry, files_to_add), level=NOISY)
+    def logout(self):
+        self.log(".logout()", level=OPERATIONAL)
+
+        for files in self._heisenfiles.itervalues():
+            for f in files:
+                f.abandon()
+
+    def _add_heisenfiles_by_path(self, userpath, files):
+        if noisy: self.log("._add_heisenfiles_by_path(%r, %r)" % (userpath, files), level=NOISY)
+
+        if userpath in self._heisenfiles:
+            self._heisenfiles[userpath] += files
+        else:
+            self._heisenfiles[userpath] = files
+
+    def _add_heisenfiles_by_direntry(self, direntry, files_to_add):
+        if noisy: self.log("._add_heisenfiles_by_direntry(%r, %r)" % (direntry, files_to_add), level=NOISY)
 
         if direntry:
-            if direntry in self._open_files:
-                self._open_files[direntry] += files_to_add
+            if direntry in all_heisenfiles:
+                (old_files, opentime) = all_heisenfiles[direntry]
+                all_heisenfiles[direntry] = (old_files + files_to_add, opentime)
             else:
-                self._open_files[direntry] = files_to_add
+                all_heisenfiles[direntry] = (files_to_add, time())
 
-            if direntry in all_open_files:
-                (old_files, opentime) = all_open_files[direntry]
-                all_open_files[direntry] = (old_files + files_to_add, opentime)
-            else:
-                all_open_files[direntry] = (files_to_add, time())
+    def _abandon_any_heisenfiles(self, userpath, direntry):
+        if noisy: self.log("._abandon_any_heisenfiles(%r, %r)" % (userpath, direntry), level=NOISY)
 
-    def _remove_any_open_files(self, direntry):
-        if noisy: self.log("._remove_any_open_files(%r)" % (direntry,), level=NOISY)
+        # First we synchronously mark all heisenfiles matching the userpath or direntry
+        # as abandoned, and remove them from the two heisenfile dicts. Then we .sync()
+        # each file that we abandoned.
+        #
+        # For each file, the call to .abandon() occurs:
+        #   * before the file is closed, in which case it will never be committed
+        #     (uploaded+linked or published); or
+        #   * after it is closed but before it has been close_notified, in which case the
+        #     .sync() ensures that it has been committed (successfully or not) before we
+        #     return.
+        #
+        # This avoids a race that might otherwise cause the file to be committed after
+        # the remove operation has completed.
+        #
+        # We return a Deferred that fires with True if any files were abandoned (this
+        # does not mean that they were not committed; it is used to determine whether
+        # a NoSuchChildError from the attempt to delete the file should be suppressed).
 
-        if direntry in self._open_files:
-            del self._open_files[direntry]
+        files = []
+        if direntry in all_heisenfiles:
+            (files, opentime) = all_heisenfiles[direntry]
+            del all_heisenfiles[direntry]
+        if userpath in self._heisenfiles:
+            files += self._heisenfiles[userpath]
+            del self._heisenfiles[userpath]
 
-        if direntry in all_open_files:
-            (files, opentime) = all_open_files[direntry]
-            for f in files:
-                f.remove()
-            del all_open_files[direntry]
-            return True
+        for f in files:
+            f.abandon()
 
-        return False
+        d = defer.succeed(None)
+        for f in files:
+            d.addBoth(lambda ign: f.sync())
 
-    def _sync_open_files(self, direntry):
-        if noisy: self.log("._sync_open_files(%r)" % (direntry,), level=NOISY)
+        d.addBoth(lambda ign: len(files) > 0)
+        return d
+
+    def _rename_heisenfiles(self, from_userpath, from_parent, from_childname,
+                            to_userpath, to_parent, to_childname, overwrite=True):
+        if noisy: self.log("._rename_heisenfiles(%r, %r, %r, %r, %r, %r, overwrite=%r)" %
+                           (from_userpath, from_parent, from_childname,
+                            to_userpath, to_parent, to_childname, overwrite), level=NOISY)
+
+        # First we synchronously rename all heisenfiles matching the userpath or direntry.
+        # Then we .sync() each file that we renamed.
+        #
+        # For each file, the call to .rename occurs:
+        #   * before the file is closed, in which case it will be committed at the
+        #     new direntry; or
+        #   * after it is closed but before it has been close_notified, in which case the
+        #     .sync() ensures that it has been committed (successfully or not) before we
+        #     return.
+        #
+        # This avoids a race that might otherwise cause the file to be committed at the
+        # old name after the rename operation has completed.
+        #
+        # Note that if overwrite is False, the caller should already have checked
+        # whether a real direntry exists at the destination. It is possible that another
+        # direntry (heisen or real) comes to exist at the destination after that check,
+        # but in that case it is correct for the rename to succeed (and for the commit
+        # of the heisenfile at the destination to possibly clobber the other entry, since
+        # that can happen anyway when we have concurrent write handles to the same direntry).
+        #
+        # We return a Deferred that fires with True if any files were renamed (this
+        # does not mean that they were not committed; it is used to determine whether
+        # a NoSuchChildError from the rename attempt should be suppressed). If overwrite
+        # is False and there were already heisenfiles at the destination userpath or
+        # direntry, we return a Deferred that fails with SFTPError(FX_PERMISSION_DENIED).
+
+        from_direntry = self._direntry_for(from_parent, from_childname)
+        to_direntry = self._direntry_for(to_parent, to_childname)
+
+        if not overwrite and (to_userpath in self._heisenfiles or to_direntry in all_heisenfiles):
+            def _existing(): raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
+            return defer.execute(_existing)
+
+        from_files = []
+        if from_direntry in all_heisenfiles:
+            (from_files, opentime) = all_heisenfiles[from_direntry]
+            del all_heisenfiles[from_direntry]
+        if from_userpath in self._heisenfiles:
+            from_files += self._heisenfiles[from_userpath]
+            del self._heisenfiles[from_userpath]
+
+        self._add_heisenfiles_by_direntry(to_direntry, from_files)
+        self._add_heisenfiles_by_path(to_userpath, from_files)
+
+        for f in from_files:
+            f.rename(to_userpath, to_parent, to_childname)
 
         d = defer.succeed(None)
-        if direntry in all_open_files:
-            (files, opentime) = all_open_files[direntry]
-            for f in files:
-                d.addCallback(lambda ign: f.sync())
+        for f in from_files:
+            d.addBoth(lambda ign: f.sync())
 
+        d.addBoth(lambda ign: len(from_files) > 0)
         return d
 
-    def _close_notify(self, parent, childname, file_to_remove):
-        if noisy: self.log("._close_notify(%r, %r, %r)" % (parent, childname, file_to_remove), level=NOISY)
+    def _sync_heisenfiles(self, userpath, direntry, ignore=None):
+        if noisy: self.log("._sync_heisenfiles(%r, %r)" % (userpath, direntry), level=NOISY)
 
-        direntry = self._direntry_for(parent, childname)
-        if direntry in self._open_files:
-            old_files = self._open_files[direntry]
-            new_files = [f for f in old_files if f is not file_to_remove]
-            if len(new_files) > 0:
-                self._open_files[direntry] = new_files
-            else:
-                del self._open_files[direntry]
+        files = []
+        if direntry in all_heisenfiles:
+            (files, opentime) = all_heisenfiles[direntry]
+        if userpath in self._heisenfiles:
+            files += self._heisenfiles[userpath]
 
-        if direntry in all_open_files:
-            (all_old_files, opentime) = all_open_files[direntry]
+        d = defer.succeed(None)
+        for f in files:
+            if f is not ignore:
+                d.addCallback(lambda ign: f.sync())
+        return d
+
+    def _remove_heisenfile(self, userpath, parent, childname, file_to_remove):
+        if noisy: self.log("._remove_file(%r, %r, %r, %r)" % (userpath, parent, childname, file_to_remove), level=NOISY)
+
+        direntry = self._direntry_for(parent, childname)
+        if direntry in all_heisenfiles:
+            (all_old_files, opentime) = all_heisenfiles[direntry]
             all_new_files = [f for f in all_old_files if f is not file_to_remove]
             if len(all_new_files) > 0:
-                all_open_files[direntry] = (all_new_files, opentime)
+                all_heisenfiles[direntry] = (all_new_files, opentime)
             else:
-                del all_open_files[direntry]
-
-    def _rename_open_files(self, from_parent, from_childname, to_parent, to_childname):
-        """When an direntry is renamed, any open files for that direntry are also renamed.
-        Return True if there were any open files at from_direntry."""
+                del all_heisenfiles[direntry]
 
-        if noisy: self.log("._rename_open_files(%r, %r, %r, %r)" %
-                           (from_parent, from_childname, to_parent, to_childname), level=NOISY)
-
-        from_direntry = self._direntry_for(from_parent, from_childname)
-        to_direntry = self._direntry_for(to_parent, to_childname)
-
-        if from_direntry in all_open_files:
-            (from_files, opentime) = all_open_files[from_direntry]
-            del self._open_files[from_direntry]
-            del all_open_files[from_direntry]
-            for file in from_files:
-                file.rename(to_parent, to_childname)
-            self._add_open_files(to_direntry, from_files)
-            return True
-        else:
-            return False
+        if userpath in self._heisenfiles:
+            old_files = self._heisenfiles[userpath]
+            new_files = [f for f in old_files if f is not file_to_remove]
+            if len(new_files) > 0:
+                self._heisenfiles[userpath] = new_files
+            else:
+                del self._heisenfiles[userpath]
 
     def _direntry_for(self, filenode_or_parent, childname=None):
         if filenode_or_parent:
@@ -1002,38 +1115,34 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
 
         return None
 
-    def logout(self):
-        self.log(".logout()", level=OPERATIONAL)
-
-        for (direntry, files_at_direntry) in enumerate(self._open_files):
-            for f in files_at_direntry:
-                f.remove()
-                f.close()
-
-    def _make_file(self, flags, parent=None, childname=None, filenode=None, metadata=None):
-        if noisy: self.log("._make_file(%r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r" %
-                           (flags, _repr_flags(flags), parent, childname, filenode, metadata), level=NOISY)
+    def _make_file(self, existing_file, userpath, flags, parent=None, childname=None, filenode=None, metadata=None):
+        if noisy: self.log("._make_file(%r, %r, %r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
+                           (existing_file, userpath, flags, _repr_flags(flags), parent, childname, filenode, metadata),
+                           level=NOISY)
 
         assert metadata is None or 'readonly' in metadata, metadata
-        writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
 
+        writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
         if childname:
             direntry = self._direntry_for(parent, childname)
         else:
             direntry = self._direntry_for(filenode)
 
-        d = self._sync_open_files(direntry)
+        d = self._sync_heisenfiles(userpath, direntry, ignore=existing_file)
 
         if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
-            d.addCallback(lambda ign: ShortReadOnlySFTPFile(filenode, metadata))
+            d.addCallback(lambda ign: ShortReadOnlySFTPFile(userpath, filenode, metadata))
         else:
-            d.addCallback(lambda ign: GeneralSFTPFile(self._close_notify, flags, self._convergence,
-                                                      parent=parent, childname=childname, filenode=filenode, metadata=metadata))
-            def _add_to_open(file):
+            close_notify = None
+            if writing:
+                close_notify = self._remove_heisenfile
+
+            d.addCallback(lambda ign: existing_file or GeneralSFTPFile(userpath, flags, close_notify, self._convergence))
+            def _got_file(file):
                 if writing:
-                    self._add_open_files(direntry, [file])
-                return file
-            d.addCallback(_add_to_open)
+                    self._add_heisenfiles_by_direntry(direntry, [file])
+                return file.open(parent=parent, childname=childname, filenode=filenode, metadata=metadata)
+            d.addCallback(_got_file)
         return d
 
     def openFile(self, pathstring, flags, attrs):
@@ -1041,21 +1150,43 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         self.log(request, level=OPERATIONAL)
 
         # This is used for both reading and writing.
-        # First exclude invalid combinations of flags.
+        # First exclude invalid combinations of flags, and empty paths.
 
         if not (flags & (FXF_READ | FXF_WRITE)):
-            raise SFTPError(FX_BAD_MESSAGE,
-                            "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
+            def _bad_readwrite():
+                raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
+            return defer.execute(_bad_readwrite)
 
         if (flags & FXF_EXCL) and not (flags & FXF_CREAT):
-            raise SFTPError(FX_BAD_MESSAGE,
-                            "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT")
+            def _bad_exclcreat():
+                raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT")
+            return defer.execute(_bad_exclcreat)
 
         path = self._path_from_string(pathstring)
         if not path:
-            raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty")
+            def _emptypath(): raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty")
+            return defer.execute(_emptypath)
+
+        # The combination of flags is potentially valid.
 
-        # The combination of flags is potentially valid. Now there are two major cases:
+        # To work around clients that have race condition bugs, a getAttr, rename, or
+        # remove request following an 'open' request with FXF_WRITE or FXF_CREAT flags,
+        # should succeed even if the 'open' request has not yet completed. So we now
+        # synchronously add a file object into the self._heisenfiles dict, indexed
+        # by its UTF-8 userpath. (We can't yet add it to the all_heisenfiles dict,
+        # because we don't yet have a user-independent path for the file.) The file
+        # object does not know its filenode, parent, or childname at this point.
+
+        userpath = self._path_to_utf8(path)
+
+        if flags & (FXF_WRITE | FXF_CREAT):
+            file = GeneralSFTPFile(userpath, flags, self._remove_heisenfile, self._convergence)
+            self._add_heisenfiles_by_path(userpath, [file])
+        else:
+            # We haven't decided which file implementation to use yet.
+            file = None
+
+        # Now there are two major cases:
         #
         #  1. The path is specified as /uri/FILECAP, with no parent directory.
         #     If the FILECAP is mutable and writeable, then we can open it in write-only
@@ -1104,10 +1235,10 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
                     raise SFTPError(FX_FAILURE,
                                     "cannot create a file exclusively when it already exists")
 
-                # The file does not need to be added to all_open_files, because it is not
+                # The file does not need to be added to all_heisenfiles, because it is not
                 # associated with a directory entry that needs to be updated.
 
-                return self._make_file(flags, filenode=root)
+                return self._make_file(file, userpath, flags, filenode=root)
             else:
                 # case 2
                 childname = path[-1]
@@ -1170,7 +1301,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
                                             "cannot open a file for writing when the parent directory is read-only")
 
                         metadata['readonly'] = _is_readonly(parent_readonly, filenode)
-                        return self._make_file(flags, parent=parent, childname=childname, filenode=filenode, metadata=metadata)
+                        return self._make_file(file, userpath, flags, parent=parent, childname=childname,
+                                               filenode=filenode, metadata=metadata)
                     def _no_child(f):
                         if noisy: self.log("_no_child(%r)" % (f,), level=NOISY)
                         f.trap(NoSuchChildError)
@@ -1182,7 +1314,7 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
                             raise SFTPError(FX_PERMISSION_DENIED,
                                             "cannot create a file when the parent directory is read-only")
 
-                        return self._make_file(flags, parent=parent, childname=childname)
+                        return self._make_file(file, userpath, flags, parent=parent, childname=childname)
                     d3.addCallbacks(_got_child, _no_child)
                     return d3
 
@@ -1190,6 +1322,11 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
                 return d2
 
         d.addCallback(_got_root)
+        def _remove_on_error(err):
+            if file:
+                self._remove_heisenfile(userpath, None, None, file)
+            return err
+        d.addErrback(_remove_on_error)
         d.addBoth(_convert_error, request)
         return d
 
@@ -1199,16 +1336,18 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
 
         from_path = self._path_from_string(from_pathstring)
         to_path = self._path_from_string(to_pathstring)
+        from_userpath = self._path_to_utf8(from_path)
+        to_userpath = self._path_to_utf8(to_path)
 
         # the target directory must already exist
         d = deferredutil.gatherResults([self._get_parent_or_node(from_path),
                                         self._get_parent_or_node(to_path)])
         def _got( (from_pair, to_pair) ):
-            if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r)" %
-                               (from_pair, to_pair, from_pathstring, to_pathstring), level=NOISY)
+            if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r, overwrite=%r)" %
+                               (from_pair, to_pair, from_pathstring, to_pathstring, overwrite), level=NOISY)
             (from_parent, from_childname) = from_pair
             (to_parent, to_childname) = to_pair
-            
+
             if from_childname is None:
                 raise SFTPError(FX_NO_SUCH_FILE, "cannot rename a source object specified by URI")
             if to_childname is None:
@@ -1217,33 +1356,47 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
             # <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.5>
             # "It is an error if there already exists a file with the name specified
             #  by newpath."
+            # OpenSSH's SFTP server returns FX_PERMISSION_DENIED for this error.
+            #
             # For the standard SSH_FXP_RENAME operation, overwrite=False.
             # We also support the posix-rename@openssh.com extension, which uses overwrite=True.
 
-            # FIXME: use move_child_to_path to avoid possible data loss due to #943
-            #d2 = from_parent.move_child_to_path(from_childname, to_root, to_path, overwrite=overwrite)
-
-            d2 = from_parent.move_child_to(from_childname, to_parent, to_childname, overwrite=overwrite)
-            def _check(err):
-                if noisy: self.log("_check(%r) in .renameFile(%r, %r)" %
-                                   (err, from_pathstring, to_pathstring), level=NOISY)
-
-                if not isinstance(err, Failure) or err.check(NoSuchChildError):
-                    # If there are open files to be written at the 'from' direntry, then ensure
-                    # they will now be written at the 'to' direntry instead.
-                    if noisy: self.log("checking open files:\nself._open_files = %r\nall_open_files = %r" %
-                                       (self._open_files, all_open_files), level=NOISY)
-                    if self._rename_open_files(from_parent, from_childname, to_parent, to_childname):
-                        # suppress the NoSuchChildError if any open files were renamed
-                        if noisy: self.log("after renaming:\nself._open_files = %r\nall_open_files = %r" %
-                                           (self._open_files, all_open_files), level=NOISY)
+            d2 = defer.fail(NoSuchChildError())
+            if not overwrite:
+                d2.addCallback(lambda ign: to_parent.get(to_childname))
+            def _expect_fail(res):
+                if not isinstance(res, Failure):
+                    raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
+
+                # It is OK if we fail for errors other than NoSuchChildError, since that probably
+                # indicates some problem accessing the destination directory.
+                res.trap(NoSuchChildError)
+            d2.addBoth(_expect_fail)
+
+            # If there are heisenfiles to be written at the 'from' direntry, then ensure
+            # they will now be written at the 'to' direntry instead.
+            d2.addCallback(lambda ign:
+                           self._rename_heisenfiles(from_userpath, from_parent, from_childname,
+                                                    to_userpath, to_parent, to_childname, overwrite=overwrite))
+
+            def _move(renamed):
+                # FIXME: use move_child_to_path to avoid possible data loss due to #943
+                #d3 = from_parent.move_child_to_path(from_childname, to_root, to_path, overwrite=overwrite)
+
+                d3 = from_parent.move_child_to(from_childname, to_parent, to_childname, overwrite=overwrite)
+                def _check(err):
+                    if noisy: self.log("_check(%r) in .renameFile(%r, %r, overwrite=%r)" %
+                                       (err, from_pathstring, to_pathstring, overwrite), level=NOISY)
+
+                    if not isinstance(err, Failure) or (renamed and err.check(NoSuchChildError)):
                         return None
-                elif err.check(ExistingChildError):
-                    # OpenSSH SFTP server returns FX_PERMISSION_DENIED
-                    raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_pathstring)
+                    if not overwrite and err.check(ExistingChildError):
+                        raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
 
-                return err
-            d2.addBoth(_check)
+                    return err
+                d3.addBoth(_check)
+                return d3
+            d2.addCallback(_move)
             return d2
         d.addCallback(_got)
         d.addBoth(_convert_error, request)
@@ -1297,35 +1450,20 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         return d
 
     def _remove_object(self, path, must_be_directory=False, must_be_file=False):
-        d = defer.maybeDeferred(self._get_parent_or_node, path)
+        userpath = self._path_to_utf8(path)
+        d = self._get_parent_or_node(path)
         def _got_parent( (parent, childname) ):
-            # FIXME (minor): there is a race condition between the 'get' and 'delete',
-            # so it is possible that the must_be_directory or must_be_file restrictions
-            # might not be enforced correctly if the type has just changed.
-
             if childname is None:
                 raise SFTPError(FX_NO_SUCH_FILE, "cannot remove an object specified by URI")
 
             direntry = self._direntry_for(parent, childname)
-            removed = self._remove_any_open_files(direntry)
-
-            d2 = parent.get(childname)
-            def _got_child(child):
-                # Unknown children can be removed by either removeFile or removeDirectory.
-                if must_be_directory and IFileNode.providedBy(child):
-                    raise SFTPError(FX_PERMISSION_DENIED, "rmdir called on a file")
-                if must_be_file and IDirectoryNode.providedBy(child):
-                    raise SFTPError(FX_PERMISSION_DENIED, "rmfile called on a directory")
-                return parent.delete(childname)
-            def _no_child(err):
-                if removed and err.check(NoSuchChildError):
-                    # suppress NoSuchChildError if an open file was removed
-                    if noisy: self.log("suppressing NoSuchChildError for %r because it was removed as an open file" %
-                                       (direntry,), level=NOISY)
-                    return None
-                else:
-                    return err
-            d2.addCallbacks(_got_child, _no_child)
+            d2 = defer.succeed(False)
+            if not must_be_directory:
+                d2.addCallback(lambda ign: self._abandon_any_heisenfiles(userpath, direntry))
+
+            d2.addCallback(lambda abandoned:
+                           parent.delete(childname, must_exist=not abandoned,
+                                         must_be_directory=must_be_directory, must_be_file=must_be_file))
             return d2
         d.addCallback(_got_parent)
         return d
@@ -1380,24 +1518,28 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         # reported as the update time of the best version. But that
         # information isn't currently stored in mutable shares, I think.
 
-        # TODO: some clients will incorrectly try to get the attributes
+        # Some clients will incorrectly try to get the attributes
         # of a file immediately after opening it, before it has been put
-        # into the all_open_files table. This is a race condition bug in
+        # into the all_heisenfiles table. This is a race condition bug in
         # the client, but we probably need to handle it anyway.
 
         path = self._path_from_string(pathstring)
+        userpath = self._path_to_utf8(path)
         d = self._get_parent_or_node(path)
         def _got_parent_or_node( (parent_or_node, childname) ):
             if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
+
+            direntry = self._direntry_for(parent_or_node, childname)
+            d2 = self._sync_heisenfiles(userpath, direntry)
+
             if childname is None:
                 node = parent_or_node
-                d2 = node.get_current_size()
+                d2.addCallback(lambda ign: node.get_current_size())
                 d2.addCallback(lambda size:
                                _populate_attrs(node, {'readonly': node.is_unknown() or node.is_readonly()}, size=size))
-                return d2
             else:
                 parent = parent_or_node
-                d2 = parent.get_child_and_metadata_at_path([childname])
+                d2.addCallback(lambda ign: parent.get_child_and_metadata_at_path([childname]))
                 def _got( (child, metadata) ):
                     if noisy: self.log("_got( (%r, %r) )" % (child, metadata), level=NOISY)
                     assert IDirectoryNode.providedBy(parent), parent
@@ -1409,10 +1551,10 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
                     if noisy: self.log("_nosuch(%r)" % (err,), level=NOISY)
                     err.trap(NoSuchChildError)
                     direntry = self._direntry_for(parent, childname)
-                    if noisy: self.log("checking open files:\nself._open_files = %r\nall_open_files = %r\ndirentry=%r" %
-                                       (self._open_files, all_open_files, direntry), level=NOISY)
-                    if direntry in all_open_files:
-                        (files, opentime) = all_open_files[direntry]
+                    if noisy: self.log("checking open files:\nself._heisenfiles = %r\nall_heisenfiles = %r\ndirentry=%r" %
+                                       (self._heisenfiles, all_heisenfiles, direntry), level=NOISY)
+                    if direntry in all_heisenfiles:
+                        (files, opentime) = all_heisenfiles[direntry]
                         sftptime = _to_sftp_time(opentime)
                         # A file that has been opened for writing necessarily has permissions rw-rw-rw-.
                         return {'permissions': S_IFREG | 0666,
@@ -1424,7 +1566,7 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
                                }
                     return err
                 d2.addCallbacks(_got, _nosuch)
-                return d2
+            return d2
         d.addCallback(_got_parent_or_node)
         d.addBoth(_convert_error, request)
         return d
@@ -1503,8 +1645,10 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
     def realPath(self, pathstring):
         self.log(".realPath(%r)" % (pathstring,), level=OPERATIONAL)
 
-        path_utf8 = [p.encode('utf-8') for p in self._path_from_string(pathstring)]
-        return "/" + "/".join(path_utf8)
+        return self._path_to_utf8(self._path_from_string(pathstring))
+
+    def _path_to_utf8(self, path):
+        return (u"/" + u"/".join(path)).encode('utf-8')
 
     def _path_from_string(self, pathstring):
         if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY)
@@ -1537,11 +1681,12 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
 
     def _get_root(self, path):
         # return Deferred (root, remaining_path)
+        d = defer.succeed(None)
         if path and path[0] == u"uri":
-            d = defer.maybeDeferred(self._client.create_node_from_uri, path[1].encode('utf-8'))
+            d.addCallback(lambda ign: self._client.create_node_from_uri(path[1].encode('utf-8')))
             d.addCallback(lambda root: (root, path[2:]))
         else:
-            d = defer.succeed((self._root, path))
+            d.addCallback(lambda ign: (self._root, path))
         return d
 
     def _get_parent_or_node(self, path):
index b3baef2f42cf120e061f664011e55cc6d97a122f..b2ed04103387aa574f8d98c378766a77fc572a4f 100644 (file)
@@ -34,7 +34,7 @@ from allmydata.test.common import ShouldFailMixin
 timeout = 240
 
 class Handler(GridTestMixin, ShouldFailMixin, unittest.TestCase):
-    """This is a no-network unit test of the SFTPHandler class."""
+    """This is a no-network unit test of the SFTPUserHandler and the abstractions it uses."""
 
     if not have_pycrypto:
         skip = "SFTP support requires pycrypto, which is not installed"