The FTP server requires Twisted's "vfs" component, which is not included in
the Twisted-8.1.0 release. If there is no newer release available, it may be
necessary to run Twisted from the SVN trunk to obtain this component.
+
+In addition, the following patch must be applied (as of r24943) to enable
+asynchronous closing of file-upload operations:
+
+Index: twisted/protocols/ftp.py
+===================================================================
+--- twisted/protocols/ftp.py (revision 24956)
++++ twisted/protocols/ftp.py (working copy)
+@@ -1049,7 +1049,6 @@
+ cons = ASCIIConsumerWrapper(cons)
+
+ d = self.dtpInstance.registerConsumer(cons)
+- d.addCallbacks(cbSent, ebSent)
+
+ # Tell them what to doooo
+ if self.dtpInstance.isConnected:
+@@ -1062,6 +1061,8 @@
+ def cbOpened(file):
+ d = file.receive()
+ d.addCallback(cbConsumer)
++ d.addCallback(lambda ignored: file.close())
++ d.addCallbacks(cbSent, ebSent)
+ return d
+
+ def ebOpened(err):
+@@ -1434,7 +1435,14 @@
+ @rtype: C{Deferred} of C{IConsumer}
+ """
+
++ def close():
++ """
++ Perform any post-write work that needs to be done. This method may
++ only be invoked once on each provider, and will always be invoked
++ after receive().
+
++ @rtype: C{Deferred} of anything: the value is ignored
++ """
+
+ def _getgroups(uid):
+ """Return the primary and supplementary groups for the given UID.
+@@ -1795,6 +1803,8 @@
+ # FileConsumer will close the file object
+ return defer.succeed(FileConsumer(self.fObj))
+
++ def close(self):
++ return defer.succeed(None)
+
+
+ class FTPRealm:
+Index: twisted/vfs/adapters/ftp.py
+===================================================================
+--- twisted/vfs/adapters/ftp.py (revision 24956)
++++ twisted/vfs/adapters/ftp.py (working copy)
+@@ -295,6 +295,11 @@
+ """
+ return defer.succeed(IConsumer(self.node))
+
++ def close(self):
++ """
++ Perform post-write actions.
++ """
++ return defer.succeed(None)
+
+
+ class _FileToConsumerAdapter(object):
self.add_service(ws)
def init_ftp_server(self):
- if not self.get_config("ftpd", "enabled", False, boolean=True):
- return
- portstr = self.get_config("ftpd", "ftp.port", "8021")
- accountfile = self.get_config("ftpd", "ftp.accounts.file", None)
- accounturl = self.get_config("ftpd", "ftp.accounts.url", None)
-
- from allmydata import ftpd
- s = ftpd.FTPServer(self, portstr, accountfile, accounturl)
- s.setServiceParent(self)
+ if self.get_config("ftpd", "enabled", False, boolean=True):
+ accountfile = self.get_config("ftpd", "ftp.accounts.file", None)
+ accounturl = self.get_config("ftpd", "ftp.accounts.url", None)
+ ftp_portstr = self.get_config("ftpd", "ftp.port", "8021")
+
+ from allmydata import ftpd
+ s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
+ s.setServiceParent(self)
def _check_hotline(self, hotline_file):
if os.path.exists(hotline_file):
class FileWriter:
implements(IConsumer)
- def __init__(self, parent, childname, convergence):
- self.parent = parent
- self.childname = childname
- self.convergence = convergence
def registerProducer(self, producer, streaming):
- self.producer = producer
if not streaming:
raise NotImplementedError("Non-streaming producer not supported.")
# we write the data to a temporary file, since Tahoe can't do
# streaming upload yet.
self.f = tempfile.TemporaryFile()
return None
+
def unregisterProducer(self):
- # now we do the upload.
-
- # bummer: unregisterProducer is run synchronously. twisted's FTP
- # server (twisted.protocols.ftp.DTP._unregConsumer:454) ignores our
- # return value, and sends the 226 Transfer Complete right after
- # unregisterProducer returns. The client will believe that the
- # transfer is indeed complete, whereas for us it is just starting.
- # Some clients will do an immediate LIST to see if the file was
- # really uploaded.
- u = FileHandle(self.f, self.convergence)
- d = self.parent.add_file(self.childname, u)
- # by patching twisted.protocols.ftp.DTP._unregConsumer to pass this
- # Deferred back, we can obtain the async-upload that we desire.
- return d
+ # the upload actually happens in WriteFile.close()
+ pass
def write(self, data):
- # if streaming==True, then the sender/producer is in charge. We are
- # allowed to call self.producer.pauseProducing() when we have too
- # much data and want them to stop, but they might not listen to us.
- # We must then call self.producer.resumeProducing() when we can
- # accomodate more.
- #
- # if streaming==False, then we (the consumer) are in charge. We must
- # keep calling p.resumeProducing() (which will prompt them to call
- # our .write again) until they finish.
- #
- # If we experience an error, call p.stopProducing().
self.f.write(data)
class WriteFile:
implements(ftp.IWriteFile)
+
def __init__(self, parent, childname, convergence):
self.parent = parent
self.childname = childname
self.convergence = convergence
+
def receive(self):
- try:
- c = FileWriter(self.parent, self.childname, self.convergence)
- except:
- log.err()
- raise
- return defer.succeed(c)
+ self.c = FileWriter()
+ return defer.succeed(self.c)
+
+ def close(self):
+ u = FileHandle(self.c.f, self.convergence)
+ d = self.parent.add_file(self.childname, u)
+ return d
+
class NoParentError(Exception):
pass
class FTPServer(service.MultiService):
- def __init__(self, client, portstr, accountfile, accounturl):
+ def __init__(self, client, accountfile, accounturl, ftp_portstr):
service.MultiService.__init__(self)
if accountfile:
p.registerChecker(c)
f = ftp.FTPFactory(p)
- s = strports.service(portstr, f)
+ s = strports.service(ftp_portstr, f)
s.setServiceParent(self)