SFTP: avoid blocking close on a heisenfile that has been abandoned or never changed...
authordavid-sarah <david-sarah@jacaranda.org>
Sun, 30 May 2010 02:55:44 +0000 (19:55 -0700)
committerdavid-sarah <david-sarah@jacaranda.org>
Sun, 30 May 2010 02:55:44 +0000 (19:55 -0700)
src/allmydata/frontends/sftpd.py

index e44270d83c89cfae01bf6cf9db577ccba34ec175..0d459f6e6dda0f2319c48b30d74ba07a2a5f7e0d 100644 (file)
@@ -526,10 +526,11 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
     def close(self):
         self.is_closed = True
         self.finish()
-        try:
-            self.f.close()
-        except EnvironmentError as e:
-            self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
+        if not self.is_closed:
+            try:
+                self.f.close()
+            except BaseException as e:
+                self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
 
     def unregisterProducer(self):
         if self.producer:
@@ -798,37 +799,48 @@ class GeneralSFTPFile(PrefixingLogMixin):
         parent = self.parent
         childname = self.childname
         
-        def _close(ign):
-            d2 = defer.succeed(None)
-            if self.has_changed and not abandoned:
-                d2.addCallback(lambda ign: self.consumer.when_done())
-                if self.filenode and self.filenode.is_mutable():
-                    self.log("update mutable file %r childname=%r" % (self.filenode, self.childname,), level=OPERATIONAL)
-                    d2.addCallback(lambda ign: self.consumer.get_current_size())
-                    d2.addCallback(lambda size: self.consumer.read(0, size))
-                    d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
-                else:
-                    def _add_file(ign):
-                        self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
-                        u = FileHandle(self.consumer.get_file(), self.convergence)
-                        return parent.add_file(childname, u)
-                    d2.addCallback(_add_file)
+        # has_changed is set when writeChunk is called, not when the write occurs, so
+        # it is correct to optimize out the commit if it is False at the close call.
+        has_changed = self.has_changed
 
-            def _committed(res):
-                if noisy: self.log("_committed(%r)" % (res,), level=NOISY)
+        def _committed(res):
+            if noisy: self.log("_committed(%r)" % (res,), level=NOISY)
 
-                self.consumer.close()
+            self.consumer.close()
+
+            # We must close_notify before re-firing self.async.
+            if self.close_notify:
+                self.close_notify(self.userpath, self.parent, self.childname, self)
+            return res
+
+        def _close(ign):
+            d2 = self.consumer.when_done()
+            if self.filenode and self.filenode.is_mutable():
+                self.log("update mutable file %r childname=%r" % (self.filenode, self.childname,), level=OPERATIONAL)
+                d2.addCallback(lambda ign: self.consumer.get_current_size())
+                d2.addCallback(lambda size: self.consumer.read(0, size))
+                d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
+            else:
+                def _add_file(ign):
+                    self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
+                    u = FileHandle(self.consumer.get_file(), self.convergence)
+                    return parent.add_file(childname, u)
+                d2.addCallback(_add_file)
 
-                # We must close_notify before re-firing self.async.
-                if self.close_notify:
-                    self.close_notify(self.userpath, self.parent, self.childname, self)
-                return res
             d2.addBoth(_committed)
             return d2
 
-        self.async.addCallback(_close)
-
         d = defer.Deferred()
+
+        # If the file has been abandoned, we don't want the close operation to get "stuck",
+        # even if self.async fails to re-fire. Doing the close independently of self.async
+        # in that case ensures that dropping an ssh connection is sufficient to abandon
+        # any heisenfiles that were not explicitly closed in that connection.
+        if abandoned or not has_changed:
+            d.addCallback(_committed)
+        else:
+            self.async.addCallback(_close)
+
         self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
         d.addBoth(_convert_error, request)
         return d
@@ -1064,7 +1076,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         return d
 
     def _sync_heisenfiles(self, userpath, direntry, ignore=None):
-        if noisy: self.log("._sync_heisenfiles(%r, %r)" % (userpath, direntry), level=NOISY)
+        request = "._sync_heisenfiles(%r, %r, ignore=%r)" % (userpath, direntry, ignore)
+        self.log(request, level=OPERATIONAL)
 
         files = []
         if direntry in all_heisenfiles:
@@ -1075,7 +1088,12 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
         d = defer.succeed(None)
         for f in files:
             if f is not ignore:
-                d.addCallback(lambda ign: f.sync())
+                d.addBoth(lambda ign: f.sync())
+
+        def _done(ign):
+            self.log("done %r" % (request,), level=OPERATIONAL)
+            return None
+        d.addBoth(_done)
         return d
 
     def _remove_heisenfile(self, userpath, parent, childname, file_to_remove):