def close(self):
self.is_closed = True
self.finish()
- try:
- self.f.close()
- except EnvironmentError as e:
- self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
+ if not self.is_closed:
+ try:
+ self.f.close()
+ except BaseException as e:
+ self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
def unregisterProducer(self):
if self.producer:
parent = self.parent
childname = self.childname
- def _close(ign):
- d2 = defer.succeed(None)
- if self.has_changed and not abandoned:
- d2.addCallback(lambda ign: self.consumer.when_done())
- if self.filenode and self.filenode.is_mutable():
- self.log("update mutable file %r childname=%r" % (self.filenode, self.childname,), level=OPERATIONAL)
- d2.addCallback(lambda ign: self.consumer.get_current_size())
- d2.addCallback(lambda size: self.consumer.read(0, size))
- d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
- else:
- def _add_file(ign):
- self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
- u = FileHandle(self.consumer.get_file(), self.convergence)
- return parent.add_file(childname, u)
- d2.addCallback(_add_file)
+ # has_changed is set when writeChunk is called, not when the write occurs, so
+ # it is correct to optimize out the commit if it is False at the close call.
+ has_changed = self.has_changed
- def _committed(res):
- if noisy: self.log("_committed(%r)" % (res,), level=NOISY)
+ def _committed(res):
+ if noisy: self.log("_committed(%r)" % (res,), level=NOISY)
- self.consumer.close()
+ self.consumer.close()
+
+ # We must close_notify before re-firing self.async.
+ if self.close_notify:
+ self.close_notify(self.userpath, self.parent, self.childname, self)
+ return res
+
+ def _close(ign):
+ d2 = self.consumer.when_done()
+ if self.filenode and self.filenode.is_mutable():
+ self.log("update mutable file %r childname=%r" % (self.filenode, self.childname,), level=OPERATIONAL)
+ d2.addCallback(lambda ign: self.consumer.get_current_size())
+ d2.addCallback(lambda size: self.consumer.read(0, size))
+ d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
+ else:
+ def _add_file(ign):
+ self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
+ u = FileHandle(self.consumer.get_file(), self.convergence)
+ return parent.add_file(childname, u)
+ d2.addCallback(_add_file)
- # We must close_notify before re-firing self.async.
- if self.close_notify:
- self.close_notify(self.userpath, self.parent, self.childname, self)
- return res
d2.addBoth(_committed)
return d2
- self.async.addCallback(_close)
-
d = defer.Deferred()
+
+ # If the file has been abandoned, we don't want the close operation to get "stuck",
+ # even if self.async fails to re-fire. Doing the close independently of self.async
+ # in that case ensures that dropping an ssh connection is sufficient to abandon
+ # any heisenfiles that were not explicitly closed in that connection.
+ if abandoned or not has_changed:
+ d.addCallback(_committed)
+ else:
+ self.async.addCallback(_close)
+
self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
d.addBoth(_convert_error, request)
return d
return d
def _sync_heisenfiles(self, userpath, direntry, ignore=None):
- if noisy: self.log("._sync_heisenfiles(%r, %r)" % (userpath, direntry), level=NOISY)
+ request = "._sync_heisenfiles(%r, %r, ignore=%r)" % (userpath, direntry, ignore)
+ self.log(request, level=OPERATIONAL)
files = []
if direntry in all_heisenfiles:
d = defer.succeed(None)
for f in files:
if f is not ignore:
- d.addCallback(lambda ign: f.sync())
+ d.addBoth(lambda ign: f.sync())
+
+ def _done(ign):
+ self.log("done %r" % (request,), level=OPERATIONAL)
+ return None
+ d.addBoth(_done)
return d
def _remove_heisenfile(self, userpath, parent, childname, file_to_remove):