From: david-sarah Date: Mon, 17 May 2010 04:42:28 +0000 (-0700) Subject: SFTP: fixed bugs that caused hangs during write (#1037). X-Git-Tag: trac-4400~24 X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/file/URI:LIT:krugkidfnzsc4/@@named=?a=commitdiff_plain;h=57699fd1eb45c9fcc7e3caad35e0b767e0dacf64;p=tahoe-lafs%2Ftahoe-lafs.git SFTP: fixed bugs that caused hangs during write (#1037). --- diff --git a/src/allmydata/frontends/sftpd.py b/src/allmydata/frontends/sftpd.py index 24b084bf..e978c508 100644 --- a/src/allmydata/frontends/sftpd.py +++ b/src/allmydata/frontends/sftpd.py @@ -318,6 +318,7 @@ class OverwriteableFileConsumer(PrefixingLogMixin): def __init__(self, check_abort, download_size, tempfile_maker): PrefixingLogMixin.__init__(self, facility="tahoe.sftp") + if noisy: self.log(".__init__(%r, %r, %r)" % (check_abort, download_size, tempfile_maker), level=NOISY) self.check_abort = check_abort self.download_size = download_size self.current_size = download_size @@ -326,6 +327,11 @@ class OverwriteableFileConsumer(PrefixingLogMixin): self.milestones = [] # empty heap of (offset, d) self.overwrites = [] # empty heap of (start, end) self.done = self.when_reached(download_size) # adds a milestone + self.is_done = False + def _signal_done(ign): + if noisy: self.log("DONE", level=NOISY) + self.is_done = True + self.done.addCallback(_signal_done) self.producer = None def get_file(self): @@ -346,13 +352,17 @@ class OverwriteableFileConsumer(PrefixingLogMixin): self.finish() def registerProducer(self, p, streaming): + if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY) self.producer = p if streaming: # call resumeProducing once to start things off p.resumeProducing() else: - while not self.done: - p.resumeProducing() + def _iterate(): + if not self.is_done: + p.resumeProducing() + eventually(_iterate) + _iterate() def write(self, data): if noisy: self.log(".write()" % (len(data),), level=NOISY) @@ -454,6 +464,7 @@ class OverwriteableFileConsumer(PrefixingLogMixin): if offset + length > self.current_size: length = self.current_size - offset + if noisy: self.log("truncating read to %r bytes" % (length,), level=NOISY) needed = min(offset + length, self.download_size) d = self.when_reached(needed) @@ -511,8 +522,8 @@ class OverwriteableFileConsumer(PrefixingLogMixin): SIZE_THRESHOLD = 1000 def _make_sftp_file(check_abort, flags, convergence, parent=None, childname=None, filenode=None, metadata=None): - if noisy: logmsg("_make_sftp_file(%r, %r, %r, parent=%r, childname=%r, filenode=%r, metadata=%r" % - (check_abort, flags, convergence, parent, childname, filenode, metadata), level=NOISY) + if noisy: logmsg("_make_sftp_file(%r, %r, , parent=%r, childname=%r, filenode=%r, metadata=%r" % + (check_abort, flags, parent, childname, filenode, metadata), level=NOISY) if not (flags & (FXF_WRITE | FXF_CREAT)) and (flags & FXF_READ) and filenode and \ not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD: @@ -604,8 +615,8 @@ class GeneralSFTPFile(PrefixingLogMixin): def __init__(self, check_abort, flags, convergence, parent=None, childname=None, filenode=None, metadata=None): PrefixingLogMixin.__init__(self, facility="tahoe.sftp") - if noisy: self.log(".__init__(%r, %r, %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" % - (check_abort, flags, convergence, parent, childname, filenode, metadata), level=NOISY) + if noisy: self.log(".__init__(%r, %r, , parent=%r, childname=%r, filenode=%r, metadata=%r)" % + (check_abort, flags, parent, childname, filenode, metadata), level=NOISY) self.check_abort = check_abort self.flags = flags @@ -643,7 +654,10 @@ class GeneralSFTPFile(PrefixingLogMixin): download_size = filenode.get_size() assert download_size is not None self.consumer = OverwriteableFileConsumer(self.check_abort, download_size, tempfile_maker) - self.async.addCallback(lambda ign: filenode.read(self.consumer, 0, None)) + def _read(ign): + if noisy: self.log("_read immutable", level=NOISY) + filenode.read(self.consumer, 0, None) + self.async.addCallback(_read) def readChunk(self, offset, length): self.log(".readChunk(%r, %r)" % (offset, length), level=OPERATIONAL) @@ -688,6 +702,8 @@ class GeneralSFTPFile(PrefixingLogMixin): # and an error occurs while flushing cached writes during the close." def _write(ign): + if noisy: self.log("_write in .writeChunk(%r, ), current_size = %r" % + (offset, len(data), self.consumer.get_current_size()), level=NOISY) # FXF_APPEND means that we should always write at the current end of file. write_offset = offset if self.flags & FXF_APPEND: diff --git a/src/allmydata/test/test_sftp.py b/src/allmydata/test/test_sftp.py index 16e68efc..51a9f962 100644 --- a/src/allmydata/test/test_sftp.py +++ b/src/allmydata/test/test_sftp.py @@ -23,10 +23,6 @@ if have_pycrypto: from twisted.conch.ssh import filetransfer as sftp from allmydata.frontends import sftpd -# FIXME remove this -#from twisted.internet.base import DelayedCall -#DelayedCall.debug = True - import traceback """ @@ -649,7 +645,6 @@ class Handler(GridTestMixin, ShouldFailMixin, unittest.TestCase): d.addCallback(lambda node: download_to_data(node)) d.addCallback(lambda data: self.failUnlessReallyEqual(data, "0123456789")) - """ # test that writing a zero-length file with EXCL only updates the directory once d.addCallback(lambda ign: self.handler.openFile("zerolength", sftp.FXF_WRITE | sftp.FXF_CREAT | @@ -681,15 +676,14 @@ class Handler(GridTestMixin, ShouldFailMixin, unittest.TestCase): d2.addCallback(lambda node: download_to_data(node)) d2.addCallback(lambda data: self.failUnlessReallyEqual(data, "")) - d2 = wf.writeChunk(10, "0123456789") - d2.addCallback(wf.writeChunk(5, "01234")) + d2.addCallback(lambda ign: wf.writeChunk(10, "0123456789")) + d2.addCallback(lambda ign: wf.writeChunk(5, "01234")) d2.addCallback(lambda ign: wf.close()) return d2 d.addCallback(_write_excl_append) d.addCallback(lambda ign: self.root.get(u"exclappend")) d.addCallback(lambda node: download_to_data(node)) d.addCallback(lambda data: self.failUnlessReallyEqual(data, "012345678901234")) - """ # test WRITE | CREAT without TRUNC d.addCallback(lambda ign: @@ -719,12 +713,11 @@ class Handler(GridTestMixin, ShouldFailMixin, unittest.TestCase): d.addCallback(_check_same_file) d.addCallback(lambda data: self.failUnlessReallyEqual(data, "mutable new! contents")) - """ # test READ | WRITE without CREAT or TRUNC d.addCallback(lambda ign: self.handler.openFile("small", sftp.FXF_READ | sftp.FXF_WRITE, {})) def _read_write(rwf): - d2 = rwf.writeChunk(8, "0123") + d2 = rwf.writeChunk(8, "0123") d2.addCallback(lambda ign: rwf.readChunk(0, 100)) d2.addCallback(lambda data: self.failUnlessReallyEqual(data, "012345670123")) d2.addCallback(lambda ign: rwf.close()) @@ -733,7 +726,7 @@ class Handler(GridTestMixin, ShouldFailMixin, unittest.TestCase): d.addCallback(lambda ign: self.root.get(u"small")) d.addCallback(lambda node: download_to_data(node)) d.addCallback(lambda data: self.failUnlessReallyEqual(data, "012345670123")) - """ + return d def test_removeFile(self):