From: david-sarah Date: Sun, 30 May 2010 02:55:44 +0000 (-0700) Subject: SFTP: avoid blocking close on a heisenfile that has been abandoned or never changed... X-Git-Url: https://git.rkrishnan.org/components/com_hotproperty/flags/reliability?a=commitdiff_plain;h=4be24a89dfbad5cadfa4d1bae4105c352995577a;p=tahoe-lafs%2Ftahoe-lafs.git SFTP: avoid blocking close on a heisenfile that has been abandoned or never changed. Also, improve the logging to help track down a case where OpenOffice hangs on opening a file with FXF_READ|FXF_WRITE. --- diff --git a/src/allmydata/frontends/sftpd.py b/src/allmydata/frontends/sftpd.py index e44270d8..0d459f6e 100644 --- a/src/allmydata/frontends/sftpd.py +++ b/src/allmydata/frontends/sftpd.py @@ -526,10 +526,11 @@ class OverwriteableFileConsumer(PrefixingLogMixin): def close(self): self.is_closed = True self.finish() - try: - self.f.close() - except EnvironmentError as e: - self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD) + if not self.is_closed: + try: + self.f.close() + except BaseException as e: + self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD) def unregisterProducer(self): if self.producer: @@ -798,37 +799,48 @@ class GeneralSFTPFile(PrefixingLogMixin): parent = self.parent childname = self.childname - def _close(ign): - d2 = defer.succeed(None) - if self.has_changed and not abandoned: - d2.addCallback(lambda ign: self.consumer.when_done()) - if self.filenode and self.filenode.is_mutable(): - self.log("update mutable file %r childname=%r" % (self.filenode, self.childname,), level=OPERATIONAL) - d2.addCallback(lambda ign: self.consumer.get_current_size()) - d2.addCallback(lambda size: self.consumer.read(0, size)) - d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents)) - else: - def _add_file(ign): - self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL) - u = FileHandle(self.consumer.get_file(), self.convergence) - return parent.add_file(childname, u) - d2.addCallback(_add_file) + # has_changed is set when writeChunk is called, not when the write occurs, so + # it is correct to optimize out the commit if it is False at the close call. + has_changed = self.has_changed - def _committed(res): - if noisy: self.log("_committed(%r)" % (res,), level=NOISY) + def _committed(res): + if noisy: self.log("_committed(%r)" % (res,), level=NOISY) - self.consumer.close() + self.consumer.close() + + # We must close_notify before re-firing self.async. + if self.close_notify: + self.close_notify(self.userpath, self.parent, self.childname, self) + return res + + def _close(ign): + d2 = self.consumer.when_done() + if self.filenode and self.filenode.is_mutable(): + self.log("update mutable file %r childname=%r" % (self.filenode, self.childname,), level=OPERATIONAL) + d2.addCallback(lambda ign: self.consumer.get_current_size()) + d2.addCallback(lambda size: self.consumer.read(0, size)) + d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents)) + else: + def _add_file(ign): + self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL) + u = FileHandle(self.consumer.get_file(), self.convergence) + return parent.add_file(childname, u) + d2.addCallback(_add_file) - # We must close_notify before re-firing self.async. - if self.close_notify: - self.close_notify(self.userpath, self.parent, self.childname, self) - return res d2.addBoth(_committed) return d2 - self.async.addCallback(_close) - d = defer.Deferred() + + # If the file has been abandoned, we don't want the close operation to get "stuck", + # even if self.async fails to re-fire. Doing the close independently of self.async + # in that case ensures that dropping an ssh connection is sufficient to abandon + # any heisenfiles that were not explicitly closed in that connection. + if abandoned or not has_changed: + d.addCallback(_committed) + else: + self.async.addCallback(_close) + self.async.addCallbacks(eventually_callback(d), eventually_errback(d)) d.addBoth(_convert_error, request) return d @@ -1064,7 +1076,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): return d def _sync_heisenfiles(self, userpath, direntry, ignore=None): - if noisy: self.log("._sync_heisenfiles(%r, %r)" % (userpath, direntry), level=NOISY) + request = "._sync_heisenfiles(%r, %r, ignore=%r)" % (userpath, direntry, ignore) + self.log(request, level=OPERATIONAL) files = [] if direntry in all_heisenfiles: @@ -1075,7 +1088,12 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin): d = defer.succeed(None) for f in files: if f is not ignore: - d.addCallback(lambda ign: f.sync()) + d.addBoth(lambda ign: f.sync()) + + def _done(ign): + self.log("done %r" % (request,), level=OPERATIONAL) + return None + d.addBoth(_done) return d def _remove_heisenfile(self, userpath, parent, childname, file_to_remove):