From: Brian Warner Date: Tue, 7 Oct 2008 01:06:05 +0000 (-0700) Subject: ftp: change the twisted hack necessary for async-write-close, to one more agreeable... X-Git-Url: https://git.rkrishnan.org/pf/content/de.html?a=commitdiff_plain;h=05a83601770daa95282e1db2d152e8485c1f04e6;p=tahoe-lafs%2Ftahoe-lafs.git ftp: change the twisted hack necessary for async-write-close, to one more agreeable to the twisted-dev folks, add a copy of the necessary patch to docs/ftp.txt --- diff --git a/docs/ftp.txt b/docs/ftp.txt index 0c8a57a2..4c392b4e 100644 --- a/docs/ftp.txt +++ b/docs/ftp.txt @@ -46,3 +46,68 @@ add the following lines to BASEDIR/tahoe.cfg: The FTP server requires Twisted's "vfs" component, which is not included in the Twisted-8.1.0 release. If there is no newer release available, it may be necessary to run Twisted from the SVN trunk to obtain this component. + +In addition, the following patch must be applied (as of r24943) to enable +asynchronous closing of file-upload operations: + +Index: twisted/protocols/ftp.py +=================================================================== +--- twisted/protocols/ftp.py (revision 24956) ++++ twisted/protocols/ftp.py (working copy) +@@ -1049,7 +1049,6 @@ + cons = ASCIIConsumerWrapper(cons) + + d = self.dtpInstance.registerConsumer(cons) +- d.addCallbacks(cbSent, ebSent) + + # Tell them what to doooo + if self.dtpInstance.isConnected: +@@ -1062,6 +1061,8 @@ + def cbOpened(file): + d = file.receive() + d.addCallback(cbConsumer) ++ d.addCallback(lambda ignored: file.close()) ++ d.addCallbacks(cbSent, ebSent) + return d + + def ebOpened(err): +@@ -1434,7 +1435,14 @@ + @rtype: C{Deferred} of C{IConsumer} + """ + ++ def close(): ++ """ ++ Perform any post-write work that needs to be done. This method may ++ only be invoked once on each provider, and will always be invoked ++ after receive(). + ++ @rtype: C{Deferred} of anything: the value is ignored ++ """ + + def _getgroups(uid): + """Return the primary and supplementary groups for the given UID. +@@ -1795,6 +1803,8 @@ + # FileConsumer will close the file object + return defer.succeed(FileConsumer(self.fObj)) + ++ def close(self): ++ return defer.succeed(None) + + + class FTPRealm: +Index: twisted/vfs/adapters/ftp.py +=================================================================== +--- twisted/vfs/adapters/ftp.py (revision 24956) ++++ twisted/vfs/adapters/ftp.py (working copy) +@@ -295,6 +295,11 @@ + """ + return defer.succeed(IConsumer(self.node)) + ++ def close(self): ++ """ ++ Perform post-write actions. ++ """ ++ return defer.succeed(None) + + + class _FileToConsumerAdapter(object): diff --git a/src/allmydata/client.py b/src/allmydata/client.py index ccfe9b1e..0f323b82 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -255,15 +255,14 @@ class Client(node.Node, testutil.PollMixin): self.add_service(ws) def init_ftp_server(self): - if not self.get_config("ftpd", "enabled", False, boolean=True): - return - portstr = self.get_config("ftpd", "ftp.port", "8021") - accountfile = self.get_config("ftpd", "ftp.accounts.file", None) - accounturl = self.get_config("ftpd", "ftp.accounts.url", None) - - from allmydata import ftpd - s = ftpd.FTPServer(self, portstr, accountfile, accounturl) - s.setServiceParent(self) + if self.get_config("ftpd", "enabled", False, boolean=True): + accountfile = self.get_config("ftpd", "ftp.accounts.file", None) + accounturl = self.get_config("ftpd", "ftp.accounts.url", None) + ftp_portstr = self.get_config("ftpd", "ftp.port", "8021") + + from allmydata import ftpd + s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr) + s.setServiceParent(self) def _check_hotline(self, hotline_file): if os.path.exists(hotline_file): diff --git a/src/allmydata/ftpd.py b/src/allmydata/ftpd.py index 89c3ce2e..eb475c91 100644 --- a/src/allmydata/ftpd.py +++ b/src/allmydata/ftpd.py @@ -26,62 +26,39 @@ class ReadFile: class FileWriter: implements(IConsumer) - def __init__(self, parent, childname, convergence): - self.parent = parent - self.childname = childname - self.convergence = convergence def registerProducer(self, producer, streaming): - self.producer = producer if not streaming: raise NotImplementedError("Non-streaming producer not supported.") # we write the data to a temporary file, since Tahoe can't do # streaming upload yet. self.f = tempfile.TemporaryFile() return None + def unregisterProducer(self): - # now we do the upload. - - # bummer: unregisterProducer is run synchronously. twisted's FTP - # server (twisted.protocols.ftp.DTP._unregConsumer:454) ignores our - # return value, and sends the 226 Transfer Complete right after - # unregisterProducer returns. The client will believe that the - # transfer is indeed complete, whereas for us it is just starting. - # Some clients will do an immediate LIST to see if the file was - # really uploaded. - u = FileHandle(self.f, self.convergence) - d = self.parent.add_file(self.childname, u) - # by patching twisted.protocols.ftp.DTP._unregConsumer to pass this - # Deferred back, we can obtain the async-upload that we desire. - return d + # the upload actually happens in WriteFile.close() + pass def write(self, data): - # if streaming==True, then the sender/producer is in charge. We are - # allowed to call self.producer.pauseProducing() when we have too - # much data and want them to stop, but they might not listen to us. - # We must then call self.producer.resumeProducing() when we can - # accomodate more. - # - # if streaming==False, then we (the consumer) are in charge. We must - # keep calling p.resumeProducing() (which will prompt them to call - # our .write again) until they finish. - # - # If we experience an error, call p.stopProducing(). self.f.write(data) class WriteFile: implements(ftp.IWriteFile) + def __init__(self, parent, childname, convergence): self.parent = parent self.childname = childname self.convergence = convergence + def receive(self): - try: - c = FileWriter(self.parent, self.childname, self.convergence) - except: - log.err() - raise - return defer.succeed(c) + self.c = FileWriter() + return defer.succeed(self.c) + + def close(self): + u = FileHandle(self.c.f, self.convergence) + d = self.parent.add_file(self.childname, u) + return d + class NoParentError(Exception): pass @@ -392,7 +369,7 @@ class Dispatcher: class FTPServer(service.MultiService): - def __init__(self, client, portstr, accountfile, accounturl): + def __init__(self, client, accountfile, accounturl, ftp_portstr): service.MultiService.__init__(self) if accountfile: @@ -408,5 +385,5 @@ class FTPServer(service.MultiService): p.registerChecker(c) f = ftp.FTPFactory(p) - s = strports.service(portstr, f) + s = strports.service(ftp_portstr, f) s.setServiceParent(self)