From: Brian Warner <warner@allmydata.com>
Date: Tue, 7 Oct 2008 01:06:05 +0000 (-0700)
Subject: ftp: change the twisted hack necessary for async-write-close, to one more agreeable... 
X-Git-Url: https://git.rkrishnan.org/simplejson/...?a=commitdiff_plain;h=05a83601770daa95282e1db2d152e8485c1f04e6;p=tahoe-lafs%2Ftahoe-lafs.git

ftp: change the twisted hack necessary for async-write-close, to one more agreeable to the twisted-dev folks, add a copy of the necessary patch to docs/ftp.txt
---

diff --git a/docs/ftp.txt b/docs/ftp.txt
index 0c8a57a2..4c392b4e 100644
--- a/docs/ftp.txt
+++ b/docs/ftp.txt
@@ -46,3 +46,68 @@ add the following lines to BASEDIR/tahoe.cfg:
 The FTP server requires Twisted's "vfs" component, which is not included in
 the Twisted-8.1.0 release. If there is no newer release available, it may be
 necessary to run Twisted from the SVN trunk to obtain this component.
+
+In addition, the following patch must be applied (as of r24943) to enable
+asynchronous closing of file-upload operations:
+
+Index: twisted/protocols/ftp.py
+===================================================================
+--- twisted/protocols/ftp.py	(revision 24956)
++++ twisted/protocols/ftp.py	(working copy)
+@@ -1049,7 +1049,6 @@
+                 cons = ASCIIConsumerWrapper(cons)
+ 
+             d = self.dtpInstance.registerConsumer(cons)
+-            d.addCallbacks(cbSent, ebSent)
+ 
+             # Tell them what to doooo
+             if self.dtpInstance.isConnected:
+@@ -1062,6 +1061,8 @@
+         def cbOpened(file):
+             d = file.receive()
+             d.addCallback(cbConsumer)
++            d.addCallback(lambda ignored: file.close())
++            d.addCallbacks(cbSent, ebSent)
+             return d
+ 
+         def ebOpened(err):
+@@ -1434,7 +1435,14 @@
+         @rtype: C{Deferred} of C{IConsumer}
+         """
+ 
++    def close():
++        """
++        Perform any post-write work that needs to be done. This method may
++        only be invoked once on each provider, and will always be invoked
++        after receive().
+ 
++        @rtype: C{Deferred} of anything: the value is ignored
++        """
+ 
+ def _getgroups(uid):
+     """Return the primary and supplementary groups for the given UID.
+@@ -1795,6 +1803,8 @@
+         # FileConsumer will close the file object
+         return defer.succeed(FileConsumer(self.fObj))
+ 
++    def close(self):
++        return defer.succeed(None)
+ 
+ 
+ class FTPRealm:
+Index: twisted/vfs/adapters/ftp.py
+===================================================================
+--- twisted/vfs/adapters/ftp.py	(revision 24956)
++++ twisted/vfs/adapters/ftp.py	(working copy)
+@@ -295,6 +295,11 @@
+         """
+         return defer.succeed(IConsumer(self.node))
+ 
++    def close(self):
++        """
++        Perform post-write actions.
++        """
++        return defer.succeed(None)
+ 
+ 
+ class _FileToConsumerAdapter(object):
diff --git a/src/allmydata/client.py b/src/allmydata/client.py
index ccfe9b1e..0f323b82 100644
--- a/src/allmydata/client.py
+++ b/src/allmydata/client.py
@@ -255,15 +255,14 @@ class Client(node.Node, testutil.PollMixin):
         self.add_service(ws)
 
     def init_ftp_server(self):
-        if not self.get_config("ftpd", "enabled", False, boolean=True):
-            return
-        portstr = self.get_config("ftpd", "ftp.port", "8021")
-        accountfile = self.get_config("ftpd", "ftp.accounts.file", None)
-        accounturl = self.get_config("ftpd", "ftp.accounts.url", None)
-
-        from allmydata import ftpd
-        s = ftpd.FTPServer(self, portstr, accountfile, accounturl)
-        s.setServiceParent(self)
+        if self.get_config("ftpd", "enabled", False, boolean=True):
+            accountfile = self.get_config("ftpd", "ftp.accounts.file", None)
+            accounturl = self.get_config("ftpd", "ftp.accounts.url", None)
+            ftp_portstr = self.get_config("ftpd", "ftp.port", "8021")
+
+            from allmydata import ftpd
+            s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
+            s.setServiceParent(self)
 
     def _check_hotline(self, hotline_file):
         if os.path.exists(hotline_file):
diff --git a/src/allmydata/ftpd.py b/src/allmydata/ftpd.py
index 89c3ce2e..eb475c91 100644
--- a/src/allmydata/ftpd.py
+++ b/src/allmydata/ftpd.py
@@ -26,62 +26,39 @@ class ReadFile:
 
 class FileWriter:
     implements(IConsumer)
-    def __init__(self, parent, childname, convergence):
-        self.parent = parent
-        self.childname = childname
-        self.convergence = convergence
 
     def registerProducer(self, producer, streaming):
-        self.producer = producer
         if not streaming:
             raise NotImplementedError("Non-streaming producer not supported.")
         # we write the data to a temporary file, since Tahoe can't do
         # streaming upload yet.
         self.f = tempfile.TemporaryFile()
         return None
+
     def unregisterProducer(self):
-        # now we do the upload.
-
-        # bummer: unregisterProducer is run synchronously. twisted's FTP
-        # server (twisted.protocols.ftp.DTP._unregConsumer:454) ignores our
-        # return value, and sends the 226 Transfer Complete right after
-        # unregisterProducer returns. The client will believe that the
-        # transfer is indeed complete, whereas for us it is just starting.
-        # Some clients will do an immediate LIST to see if the file was
-        # really uploaded.
-        u = FileHandle(self.f, self.convergence)
-        d = self.parent.add_file(self.childname, u)
-        # by patching twisted.protocols.ftp.DTP._unregConsumer to pass this
-        # Deferred back, we can obtain the async-upload that we desire.
-        return d
+        # the upload actually happens in WriteFile.close()
+        pass
 
     def write(self, data):
-        # if streaming==True, then the sender/producer is in charge. We are
-        # allowed to call self.producer.pauseProducing() when we have too
-        # much data and want them to stop, but they might not listen to us.
-        # We must then call self.producer.resumeProducing() when we can
-        # accomodate more.
-        #
-        # if streaming==False, then we (the consumer) are in charge. We must
-        # keep calling p.resumeProducing() (which will prompt them to call
-        # our .write again) until they finish.
-        #
-        # If we experience an error, call p.stopProducing().
         self.f.write(data)
 
 class WriteFile:
     implements(ftp.IWriteFile)
+
     def __init__(self, parent, childname, convergence):
         self.parent = parent
         self.childname = childname
         self.convergence = convergence
+
     def receive(self):
-        try:
-            c = FileWriter(self.parent, self.childname, self.convergence)
-        except:
-            log.err()
-            raise
-        return defer.succeed(c)
+        self.c = FileWriter()
+        return defer.succeed(self.c)
+
+    def close(self):
+        u = FileHandle(self.c.f, self.convergence)
+        d = self.parent.add_file(self.childname, u)
+        return d
+
 
 class NoParentError(Exception):
     pass
@@ -392,7 +369,7 @@ class Dispatcher:
 
 
 class FTPServer(service.MultiService):
-    def __init__(self, client, portstr, accountfile, accounturl):
+    def __init__(self, client, accountfile, accounturl, ftp_portstr):
         service.MultiService.__init__(self)
 
         if accountfile:
@@ -408,5 +385,5 @@ class FTPServer(service.MultiService):
         p.registerChecker(c)
         f = ftp.FTPFactory(p)
 
-        s = strports.service(portstr, f)
+        s = strports.service(ftp_portstr, f)
         s.setServiceParent(self)