SFTP: fixed bugs that caused hangs during write (#1037).
authordavid-sarah <david-sarah@jacaranda.org>
Mon, 17 May 2010 04:42:28 +0000 (21:42 -0700)
committerdavid-sarah <david-sarah@jacaranda.org>
Mon, 17 May 2010 04:42:28 +0000 (21:42 -0700)
src/allmydata/frontends/sftpd.py
src/allmydata/test/test_sftp.py

index 24b084bfbc6b6ce39c7a6417e1777ba884b87a20..e978c508559662587c5884b543ec6cdddaad94d9 100644 (file)
@@ -318,6 +318,7 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
 
     def __init__(self, check_abort, download_size, tempfile_maker):
         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
+        if noisy: self.log(".__init__(%r, %r, %r)" % (check_abort, download_size, tempfile_maker), level=NOISY)
         self.check_abort = check_abort
         self.download_size = download_size
         self.current_size = download_size
@@ -326,6 +327,11 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
         self.milestones = []  # empty heap of (offset, d)
         self.overwrites = []  # empty heap of (start, end)
         self.done = self.when_reached(download_size)  # adds a milestone
+        self.is_done = False
+        def _signal_done(ign):
+            if noisy: self.log("DONE", level=NOISY)
+            self.is_done = True
+        self.done.addCallback(_signal_done)
         self.producer = None
 
     def get_file(self):
@@ -346,13 +352,17 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
             self.finish()
 
     def registerProducer(self, p, streaming):
+        if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY)
         self.producer = p
         if streaming:
             # call resumeProducing once to start things off
             p.resumeProducing()
         else:
-            while not self.done:
-                p.resumeProducing()
+            def _iterate():
+                if not self.is_done:
+                    p.resumeProducing()
+                    eventually(_iterate)
+            _iterate()
 
     def write(self, data):
         if noisy: self.log(".write(<data of length %r>)" % (len(data),), level=NOISY)
@@ -454,6 +464,7 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
 
         if offset + length > self.current_size:
             length = self.current_size - offset
+            if noisy: self.log("truncating read to %r bytes" % (length,), level=NOISY)
 
         needed = min(offset + length, self.download_size)
         d = self.when_reached(needed)
@@ -511,8 +522,8 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
 SIZE_THRESHOLD = 1000
 
 def _make_sftp_file(check_abort, flags, convergence, parent=None, childname=None, filenode=None, metadata=None):
-    if noisy: logmsg("_make_sftp_file(%r, %r, %r, parent=%r, childname=%r, filenode=%r, metadata=%r" %
-                      (check_abort, flags, convergence, parent, childname, filenode, metadata), level=NOISY)
+    if noisy: logmsg("_make_sftp_file(%r, %r, <convergence censored>, parent=%r, childname=%r, filenode=%r, metadata=%r" %
+                      (check_abort, flags, parent, childname, filenode, metadata), level=NOISY)
 
     if not (flags & (FXF_WRITE | FXF_CREAT)) and (flags & FXF_READ) and filenode and \
        not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
@@ -604,8 +615,8 @@ class GeneralSFTPFile(PrefixingLogMixin):
 
     def __init__(self, check_abort, flags, convergence, parent=None, childname=None, filenode=None, metadata=None):
         PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
-        if noisy: self.log(".__init__(%r, %r, %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
-                           (check_abort, flags, convergence, parent, childname, filenode, metadata), level=NOISY)
+        if noisy: self.log(".__init__(%r, %r, <convergence censored>, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
+                           (check_abort, flags, parent, childname, filenode, metadata), level=NOISY)
 
         self.check_abort = check_abort
         self.flags = flags
@@ -643,7 +654,10 @@ class GeneralSFTPFile(PrefixingLogMixin):
                 download_size = filenode.get_size()
                 assert download_size is not None
                 self.consumer = OverwriteableFileConsumer(self.check_abort, download_size, tempfile_maker)
-                self.async.addCallback(lambda ign: filenode.read(self.consumer, 0, None))
+                def _read(ign):
+                    if noisy: self.log("_read immutable", level=NOISY)
+                    filenode.read(self.consumer, 0, None)
+                self.async.addCallback(_read)
 
     def readChunk(self, offset, length):
         self.log(".readChunk(%r, %r)" % (offset, length), level=OPERATIONAL)
@@ -688,6 +702,8 @@ class GeneralSFTPFile(PrefixingLogMixin):
         #    and an error occurs while flushing cached writes during the close."
 
         def _write(ign):
+            if noisy: self.log("_write in .writeChunk(%r, <data of length %r>), current_size = %r" %
+                               (offset, len(data), self.consumer.get_current_size()), level=NOISY)
             # FXF_APPEND means that we should always write at the current end of file.
             write_offset = offset
             if self.flags & FXF_APPEND:
index 16e68efc31444f9ef2d1b3e86434d7534f1cf9bb..51a9f9624f6b297a4852f257b831638b66956a78 100644 (file)
@@ -23,10 +23,6 @@ if have_pycrypto:
     from twisted.conch.ssh import filetransfer as sftp
     from allmydata.frontends import sftpd
 
-# FIXME remove this
-#from twisted.internet.base import DelayedCall
-#DelayedCall.debug = True
-
 import traceback
 
 """
@@ -649,7 +645,6 @@ class Handler(GridTestMixin, ShouldFailMixin, unittest.TestCase):
         d.addCallback(lambda node: download_to_data(node))
         d.addCallback(lambda data: self.failUnlessReallyEqual(data, "0123456789"))
 
-        """
         # test that writing a zero-length file with EXCL only updates the directory once
         d.addCallback(lambda ign:
                       self.handler.openFile("zerolength", sftp.FXF_WRITE | sftp.FXF_CREAT |
@@ -681,15 +676,14 @@ class Handler(GridTestMixin, ShouldFailMixin, unittest.TestCase):
             d2.addCallback(lambda node: download_to_data(node))
             d2.addCallback(lambda data: self.failUnlessReallyEqual(data, ""))
 
-            d2 = wf.writeChunk(10, "0123456789")
-            d2.addCallback(wf.writeChunk(5, "01234"))
+            d2.addCallback(lambda ign: wf.writeChunk(10, "0123456789"))
+            d2.addCallback(lambda ign: wf.writeChunk(5, "01234"))
             d2.addCallback(lambda ign: wf.close())
             return d2
         d.addCallback(_write_excl_append)
         d.addCallback(lambda ign: self.root.get(u"exclappend"))
         d.addCallback(lambda node: download_to_data(node))
         d.addCallback(lambda data: self.failUnlessReallyEqual(data, "012345678901234"))
-        """
 
         # test WRITE | CREAT without TRUNC
         d.addCallback(lambda ign:
@@ -719,12 +713,11 @@ class Handler(GridTestMixin, ShouldFailMixin, unittest.TestCase):
         d.addCallback(_check_same_file)
         d.addCallback(lambda data: self.failUnlessReallyEqual(data, "mutable new! contents"))
 
-        """
         # test READ | WRITE without CREAT or TRUNC
         d.addCallback(lambda ign:
                       self.handler.openFile("small", sftp.FXF_READ | sftp.FXF_WRITE, {}))
         def _read_write(rwf):
-            d2 =  rwf.writeChunk(8, "0123")
+            d2 = rwf.writeChunk(8, "0123")
             d2.addCallback(lambda ign: rwf.readChunk(0, 100))
             d2.addCallback(lambda data: self.failUnlessReallyEqual(data, "012345670123"))
             d2.addCallback(lambda ign: rwf.close())
@@ -733,7 +726,7 @@ class Handler(GridTestMixin, ShouldFailMixin, unittest.TestCase):
         d.addCallback(lambda ign: self.root.get(u"small"))
         d.addCallback(lambda node: download_to_data(node))
         d.addCallback(lambda data: self.failUnlessReallyEqual(data, "012345670123"))
-        """
+
         return d
 
     def test_removeFile(self):