From: david-sarah <david-sarah@jacaranda.org>
Date: Tue, 25 May 2010 23:02:57 +0000 (-0700)
Subject: SFTP: 'sync' any open files at a direntry before opening any new file at that direntr... 
X-Git-Tag: trac-4400~2
X-Git-Url: https://git.rkrishnan.org/simplejson/something?a=commitdiff_plain;h=0b888f82012e25e4cfb5af5904c222472286a185;p=tahoe-lafs%2Ftahoe-lafs.git

SFTP: 'sync' any open files at a direntry before opening any new file at that direntry. This works around the sshfs misbehaviour of returning success to clients immediately on close.
---

diff --git a/src/allmydata/frontends/sftpd.py b/src/allmydata/frontends/sftpd.py
index b6ccefb9..45ff5e00 100644
--- a/src/allmydata/frontends/sftpd.py
+++ b/src/allmydata/frontends/sftpd.py
@@ -568,7 +568,7 @@ class ShortReadOnlySFTPFile(PrefixingLogMixin):
 
         d = defer.Deferred()
         def _read(data):
-            if noisy: self.log("_read(%r) in readChunk(%r, %r)" % (data, offset, length), level=NOISY)
+            if noisy: self.log("_read(<data of length %r>) in readChunk(%r, %r)" % (len(data), offset, length), level=NOISY)
 
             # "In response to this request, the server will read as many bytes as it
             #  can from the file (up to 'len'), and return them in a SSH_FXP_DATA
@@ -643,6 +643,7 @@ class GeneralSFTPFile(PrefixingLogMixin):
         # Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created.
         self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL)
         self.closed = False
+        self.added = False
         self.removed = False
         
         # self.consumer should only be relied on in callbacks for self.async, since it might
@@ -688,6 +689,13 @@ class GeneralSFTPFile(PrefixingLogMixin):
 
         self.removed = True
 
+    def sync(self):
+        self.log(".sync()", level=OPERATIONAL)
+
+        d = defer.Deferred()
+        self.async.addBoth(eventually_callback(d))
+        return d
+
     def readChunk(self, offset, length):
         request = ".readChunk(%r, %r)" % (offset, length)
         self.log(request, level=OPERATIONAL)
@@ -727,9 +735,9 @@ class GeneralSFTPFile(PrefixingLogMixin):
 
         # Note that we return without waiting for the write to occur. Reads and
         # close wait for prior writes, and will fail if any prior operation failed.
-        # This is ok because SFTP makes no guarantee that the request completes
-        # before the write. In fact it explicitly allows write errors to be delayed
-        # until close:
+        # This is ok because SFTP makes no guarantee that the write completes
+        # before the request does. In fact it explicitly allows write errors to be
+        # delayed until close:
         #   "One should note that on some server platforms even a close can fail.
         #    This can happen e.g. if the server operating system caches writes,
         #    and an error occurs while flushing cached writes during the close."
@@ -767,10 +775,12 @@ class GeneralSFTPFile(PrefixingLogMixin):
             if self.has_changed and not self.removed:
                 d2.addCallback(lambda ign: self.consumer.when_done())
                 if self.filenode and self.filenode.is_mutable():
+                    self.log("update mutable file %r childname=%r" % (self.filenode, self.childname,), level=OPERATIONAL)
                     d2.addCallback(lambda ign: self.consumer.get_current_size())
                     d2.addCallback(lambda size: self.consumer.read(0, size))
                     d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
                 else:
+                    self.added = True
                     def _add_file(ign):
                         self.log("_add_file childname=%r" % (self.childname,), level=OPERATIONAL)
                         u = FileHandle(self.consumer.get_file(), self.convergence)
@@ -785,12 +795,14 @@ class GeneralSFTPFile(PrefixingLogMixin):
         self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
 
         def _closed(res):
+            if noisy: self.log("_closed(%r)" % (res,), level=NOISY)
             self.close_notify(self.parent, self.childname, self)
 
             # It is possible for there to be a race between adding the file and removing it.
-            if self.removed:
+            if self.added and self.removed:
                 self.log("oops, we added %r but must now remove it" % (self.childname,), level=OPERATIONAL)
                 d2 = self.parent.delete(self.childname)
+                d2.addBoth(_convert_error, request)  # just for logging
                 d2.addBoth(lambda ign: res)
                 return d2
             return res
@@ -899,6 +911,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
                }
 
     def _add_open_files(self, direntry, files_to_add):
+        if noisy: self.log("._add_open_files(%r, %r)" % (direntry, files_to_add), level=NOISY)
+
         if direntry:
             if direntry in self._open_files:
                 self._open_files[direntry] += files_to_add
@@ -912,23 +926,34 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
                 all_open_files[direntry] = (files_to_add, time())
 
     def _remove_any_open_files(self, direntry):
-        removed = False
+        if noisy: self.log("._remove_any_open_files(%r)" % (direntry,), level=NOISY)
+
         if direntry in self._open_files:
-            for f in self._open_files[direntry]:
-                f.remove()
             del self._open_files[direntry]
-            removed = True
 
         if direntry in all_open_files:
             (files, opentime) = all_open_files[direntry]
             for f in files:
                 f.remove()
             del all_open_files[direntry]
-            removed = True
+            return True
+
+        return False
+
+    def _sync_open_files(self, direntry):
+        if noisy: self.log("._sync_open_files(%r)" % (direntry,), level=NOISY)
 
-        return removed
+        d = defer.succeed(None)
+        if direntry in all_open_files:
+            (files, opentime) = all_open_files[direntry]
+            for f in files:
+                d.addCallback(lambda ign: f.sync())
+
+        return d
 
     def _close_notify(self, parent, childname, file_to_remove):
+        if noisy: self.log("._close_notify(%r, %r, %r)" % (parent, childname, file_to_remove), level=NOISY)
+
         direntry = self._direntry_for(parent, childname)
         if direntry in self._open_files:
             old_files = self._open_files[direntry]
@@ -950,6 +975,9 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         """When an direntry is renamed, any open files for that direntry are also renamed.
         Return True if there were any open files at from_direntry."""
 
+        if noisy: self.log("._rename_open_files(%r, %r, %r, %r)" %
+                           (from_parent, from_childname, to_parent, to_childname), level=NOISY)
+
         from_direntry = self._direntry_for(from_parent, from_childname)
         to_direntry = self._direntry_for(to_parent, to_childname)
 
@@ -964,11 +992,13 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         else:
             return False
 
-    def _direntry_for(self, parent, childname):
-        if parent and childname:
-            rw_uri = parent.get_write_uri()
-            if rw_uri:
+    def _direntry_for(self, filenode_or_parent, childname=None):
+        if filenode_or_parent:
+            rw_uri = filenode_or_parent.get_write_uri()
+            if rw_uri and childname:
                 return rw_uri + "/" + childname.encode('utf-8')
+            else:
+                return rw_uri
 
         return None
 
@@ -987,17 +1017,24 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         assert metadata is None or 'readonly' in metadata, metadata
         writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
 
-        if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
-            return ShortReadOnlySFTPFile(filenode, metadata)
+        if childname:
+            direntry = self._direntry_for(parent, childname)
         else:
-            direntry = None
-            if writing:
-                direntry = self._direntry_for(parent, childname)
+            direntry = self._direntry_for(filenode)
 
-            file = GeneralSFTPFile(self._close_notify, flags, self._convergence,
-                                   parent=parent, childname=childname, filenode=filenode, metadata=metadata)
-            self._add_open_files(direntry, [file])
-            return file
+        d = self._sync_open_files(direntry)
+
+        if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
+            d.addCallback(lambda ign: ShortReadOnlySFTPFile(filenode, metadata))
+        else:
+            d.addCallback(lambda ign: GeneralSFTPFile(self._close_notify, flags, self._convergence,
+                                                      parent=parent, childname=childname, filenode=filenode, metadata=metadata))
+            def _add_to_open(file):
+                if writing:
+                    self._add_open_files(direntry, [file])
+                return file
+            d.addCallback(_add_to_open)
+        return d
 
     def openFile(self, pathstring, flags, attrs):
         request = ".openFile(%r, %r = %r, %r)" % (pathstring, flags, _repr_flags(flags), attrs)
@@ -1006,10 +1043,6 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         # This is used for both reading and writing.
         # First exclude invalid combinations of flags.
 
-        # /usr/bin/sftp 'get' gives us FXF_READ, while 'put' on a new file
-        # gives FXF_WRITE | FXF_CREAT | FXF_TRUNC. I'm guessing that 'put' on an
-        # existing file gives the same.
-
         if not (flags & (FXF_READ | FXF_WRITE)):
             raise SFTPError(FX_BAD_MESSAGE,
                             "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
@@ -1040,9 +1073,14 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         #       d. the child is immutable: if we are trying to open it write-only or
         #          read/write, then we must be able to write to the parent directory.
         #
-        # To reduce latency, open succeeds as soon as these conditions are met, even
-        # though there might be a failure in downloading the existing file or uploading
-        # a new one.
+        # To reduce latency, open normally succeeds as soon as these conditions are
+        # met, even though there might be a failure in downloading the existing file
+        # or uploading a new one. However, there is an exception: if a file has been
+        # written, then closed, and is now being reopened, then we have to delay the
+        # open until the previous upload/publish has completed. This is necessary
+        # because sshfs does not wait for the result of an FXF_CLOSE message before
+        # reporting to the client that a file has been closed. It applies both to
+        # mutable files, and to directory entries linked to an immutable file.
         #
         # Note that the permission checks below are for more precise error reporting on
         # the open call; later operations would fail even if we did not make these checks.
@@ -1268,7 +1306,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
             if childname is None:
                 raise SFTPError(FX_NO_SUCH_FILE, "cannot remove an object specified by URI")
 
-            removed = self._remove_any_open_files(self._direntry_for(parent, childname))
+            direntry = self._direntry_for(parent, childname)
+            removed = self._remove_any_open_files(direntry)
 
             d2 = parent.get(childname)
             def _got_child(child):
@@ -1281,6 +1320,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
             def _no_child(err):
                 if removed and err.check(NoSuchChildError):
                     # suppress NoSuchChildError if an open file was removed
+                    if noisy: self.log("suppressing NoSuchChildError for %r because it was removed as an open file" %
+                                       (direntry,), level=NOISY)
                     return None
                 else:
                     return err
@@ -1339,6 +1380,11 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         # reported as the update time of the best version. But that
         # information isn't currently stored in mutable shares, I think.
 
+        # TODO: some clients will incorrectly try to get the attributes
+        # of a file immediately after opening it, before it has been put
+        # into the all_open_files table. This is a race condition bug in
+        # the client, but we probably need to handle it anyway.
+
         path = self._path_from_string(pathstring)
         d = self._get_parent_or_node(path)
         def _got_parent_or_node( (parent_or_node, childname) ):