ftp server: initial implementation. Still needs unit tests, custom Twisted patches...
authorBrian Warner <warner@allmydata.com>
Mon, 6 Oct 2008 19:52:36 +0000 (12:52 -0700)
committerBrian Warner <warner@allmydata.com>
Mon, 6 Oct 2008 19:52:36 +0000 (12:52 -0700)
docs/ftp.txt [new file with mode: 0644]
src/allmydata/client.py
src/allmydata/ftpd.py [new file with mode: 0644]
src/allmydata/immutable/download.py
src/allmydata/immutable/filenode.py

diff --git a/docs/ftp.txt b/docs/ftp.txt
new file mode 100644 (file)
index 0000000..0c8a57a
--- /dev/null
@@ -0,0 +1,48 @@
+= Tahoe FTP Frontend =
+
+All Tahoe client nodes can run a frontend FTP server, allowing regular FTP
+clients to access the virtual filesystem.
+
+Since Tahoe does not use user accounts or passwords, the FTP server must be
+configured with a way to translate USER+PASS into a root directory cap. Two
+mechanisms are provided. The first is a simple flat file with one account per
+line. The second is an HTTP-based login mechanism, backed by simple PHP
+script and a database. The latter form is used by allmydata.com to provide
+secure access to customer rootcaps.
+
+== Configuring an Account File ==
+
+To configure the first form, create a file (probably in
+BASEDIR/private/ftp.accounts) in which each non-comment/non-blank line is a
+space-separated line of (USERNAME, PASSWORD, ROOTCAP), like so:
+
+ % cat BASEDIR/private/ftp.accounts
+ # This is a password file, (username, password, rootcap)
+ alice password URI:DIR2:ioej8xmzrwilg772gzj4fhdg7a:wtiizszzz2rgmczv4wl6bqvbv33ag4kvbr6prz3u6w3geixa6m6a
+ bob sekrit URI:DIR2:6bdmeitystckbl9yqlw7g56f4e:serp5ioqxnh34mlbmzwvkp3odehsyrr7eytt5f64we3k9hhcrcja
+
+Then add the following lines to the BASEDIR/tahoe.cfg file:
+
+ [ftpd]
+ enabled = true
+ ftp.port = 8021
+ ftp.accounts.file = private/ftp.accounts
+
+The FTP server will listen on the given port number. The ftp.accounts.file
+pathname will be interpreted relative to the node's BASEDIR.
+
+== Configuring an Account Server ==
+
+Determine the URL of the account server, say https://example.com/login . Then
+add the following lines to BASEDIR/tahoe.cfg:
+
+ [ftpd]
+ enabled = true
+ ftp.port = 8021
+ ftp.accounts.url = https://example.com/login
+
+== Dependencies ==
+
+The FTP server requires Twisted's "vfs" component, which is not included in
+the Twisted-8.1.0 release. If there is no newer release available, it may be
+necessary to run Twisted from the SVN trunk to obtain this component.
index 42c6a9c8f8b25c4a3a379f0232805630616c9767..ccfe9b1ea50ee9bc0a8b232b7b8063742c9e883d 100644 (file)
@@ -78,6 +78,7 @@ class Client(node.Node, testutil.PollMixin):
         if key_gen_furl:
             self.init_key_gen(key_gen_furl)
         # ControlServer and Helper are attached after Tub startup
+        self.init_ftp_server()
 
         hotline_file = os.path.join(self.basedir,
                                     self.SUICIDE_PREVENTION_HOTLINE_FILE)
@@ -253,6 +254,17 @@ class Client(node.Node, testutil.PollMixin):
         ws = WebishServer(webport, nodeurl_path)
         self.add_service(ws)
 
+    def init_ftp_server(self):
+        if not self.get_config("ftpd", "enabled", False, boolean=True):
+            return
+        portstr = self.get_config("ftpd", "ftp.port", "8021")
+        accountfile = self.get_config("ftpd", "ftp.accounts.file", None)
+        accounturl = self.get_config("ftpd", "ftp.accounts.url", None)
+
+        from allmydata import ftpd
+        s = ftpd.FTPServer(self, portstr, accountfile, accounturl)
+        s.setServiceParent(self)
+
     def _check_hotline(self, hotline_file):
         if os.path.exists(hotline_file):
             mtime = os.stat(hotline_file)[stat.ST_MTIME]
diff --git a/src/allmydata/ftpd.py b/src/allmydata/ftpd.py
new file mode 100644 (file)
index 0000000..e19af52
--- /dev/null
@@ -0,0 +1,367 @@
+
+import os
+import tempfile
+from zope.interface import implements
+from twisted.application import service, strports
+from twisted.internet import defer
+from twisted.internet.interfaces import IConsumer
+from twisted.protocols import ftp
+from twisted.cred import portal, checkers
+from twisted.vfs import pathutils
+from twisted.vfs.backends import osfs
+from twisted.python import log
+
+from allmydata.interfaces import IDirectoryNode, ExistingChildError
+from allmydata.immutable.download import ConsumerAdapter
+from allmydata.immutable.upload import FileHandle
+
+class ReadFile:
+    implements(ftp.IReadFile)
+    def __init__(self, node):
+        self.node = node
+    def send(self, consumer):
+        print "SEND", consumer
+        ad = ConsumerAdapter(consumer)
+        d = self.node.download(ad)
+        def _downloaded(res):
+            print "DONE"
+        d.addCallback(_downloaded)
+        return d # when consumed
+
+class FileWriter:
+    implements(IConsumer)
+    def __init__(self, parent, childname, convergence):
+        self.parent = parent
+        self.childname = childname
+        self.convergence = convergence
+
+    def registerProducer(self, producer, streaming):
+        self.producer = producer
+        print "producer", producer
+        print "streaming", streaming
+        if not streaming:
+            raise NotImplementedError("Non-streaming producer not supported.")
+        # we write the data to a temporary file, since Tahoe can't do
+        # streaming upload yet.
+        self.f = tempfile.TemporaryFile()
+        return None
+    def unregisterProducer(self):
+        # now we do the upload.
+
+        # bummer: unregisterProducer is run synchronously. twisted's FTP
+        # server (twisted.protocols.ftp.DTP._unregConsumer:454) ignores our
+        # return value, and sends the 226 Transfer Complete right after
+        # unregisterProducer returns. The client will believe that the
+        # transfer is indeed complete, whereas for us it is just starting.
+        # Some clients will do an immediate LIST to see if the file was
+        # really uploaded.
+        print "UPLOAD STSARTING"
+        u = FileHandle(self.f, self.convergence)
+        d = self.parent.add_file(self.childname, u)
+        def _done(res):
+            print "UPLOAD DONE", res
+        d.addCallback(_done)
+        # by patching twisted.protocols.ftp.DTP._unregConsumer to pass this
+        # Deferred back, we can obtain the async-upload that we desire.
+        return d
+
+    def write(self, data):
+        print "write(%d)" % len(data)
+        # if streaming==True, then the sender/producer is in charge. We are
+        # allowed to call self.producer.pauseProducing() when we have too
+        # much data and want them to stop, but they might not listen to us.
+        # We must then call self.producer.resumeProducing() when we can
+        # accomodate more.
+        #
+        # if streaming==False, then we (the consumer) are in charge. We must
+        # keep calling p.resumeProducing() (which will prompt them to call
+        # our .write again) until they finish.
+        #
+        # If we experience an error, call p.stopProducing().
+        self.f.write(data)
+
+class WriteFile:
+    implements(ftp.IWriteFile)
+    def __init__(self, parent, childname, convergence):
+        self.parent = parent
+        self.childname = childname
+        self.convergence = convergence
+    def receive(self):
+        print "RECEIVE"
+        try:
+            c = FileWriter(self.parent, self.childname, self.convergence)
+        except:
+            print "PROBLEM"
+            log.err()
+            raise
+        return defer.succeed(c)
+
+class NoParentError(Exception):
+    pass
+
+class Handler:
+    implements(ftp.IFTPShell)
+    def __init__(self, client, rootnode, username, convergence):
+        self.client = client
+        self.root = rootnode
+        self.username = username
+        self.convergence = convergence
+
+    def makeDirectory(self, path):
+        print "MAKEDIR", path
+        d = self._get_root(path)
+        d.addCallback(lambda (root,path):
+                      self._get_or_create_directories(root, path))
+        return d
+
+    def _get_or_create_directories(self, node, path):
+        if not IDirectoryNode.providedBy(node):
+            # unfortunately it is too late to provide the name of the
+            # blocking directory in the error message.
+            raise ftp.FileExistsError("cannot create directory because there "
+                                      "is a file in the way")
+        if not path:
+            return defer.succeed(node)
+        d = node.get(path[0])
+        def _maybe_create(f):
+            f.trap(KeyError)
+            return node.create_empty_directory(path[0])
+        d.addErrback(_maybe_create)
+        d.addCallback(self._get_or_create_directories, path[1:])
+        return d
+
+    def _get_parent(self, path):
+        # fire with (parentnode, childname)
+        path = [unicode(p) for p in path]
+        if not path:
+            raise NoParentError
+        childname = path[-1]
+        d = self._get_root(path)
+        def _got_root((root, path)):
+            print "GOT ROOT", root, path
+            if not path:
+                raise NoParentError
+            return root.get_child_at_path(path[:-1])
+        d.addCallback(_got_root)
+        def _got_parent(parent):
+            return (parent, childname)
+        d.addCallback(_got_parent)
+        return d
+
+    def _remove_thing(self, path, must_be_directory=False, must_be_file=False):
+        d = defer.maybeDeferred(self._get_parent, path)
+        def _convert_error(f):
+            f.trap(NoParentError)
+            raise ftp.PermissionDeniedError("cannot delete root directory")
+        d.addErrback(_convert_error)
+        def _got_parent( (parent, childname) ):
+            print "GOT PARENT", parent
+            d = parent.get(childname)
+            def _got_child(child):
+                print "GOT HILD", child
+                if must_be_directory and not IDirectoryNode.providedBy(child):
+                    raise ftp.IsNotADirectoryError("rmdir called on a file")
+                if must_be_file and IDirectoryNode.providedBy(child):
+                    raise ftp.IsADirectoryError("rmfile called on a directory")
+                return parent.delete(childname)
+            d.addCallback(_got_child)
+            d.addErrback(self._convert_error)
+            return d
+        d.addCallback(_got_parent)
+        return d
+
+    def removeDirectory(self, path):
+        print "RMDIR", path
+        return self._remove_thing(path, must_be_directory=True)
+
+    def removeFile(self, path):
+        print "RM", path
+        return self._remove_thing(path, must_be_file=True)
+
+    def rename(self, fromPath, toPath):
+        print "MV", fromPath, toPath
+        # the target directory must already exist
+        d = self._get_parent(fromPath)
+        def _got_from_parent( (fromparent, childname) ):
+            d = self._get_parent(toPath)
+            d.addCallback(lambda (toparent, tochildname):
+                          fromparent.move_child_to(childname,
+                                                   toparent, tochildname,
+                                                   overwrite=False))
+            return d
+        d.addCallback(_got_from_parent)
+        d.addErrback(self._convert_error)
+        return d
+
+    def access(self, path):
+        print "ACCESS", path
+        # we allow access to everything that exists. We are required to raise
+        # an error for paths that don't exist: FTP clients (at least ncftp)
+        # uses this to decide whether to mkdir or not.
+        d = self._get_node_and_metadata_for_path(path)
+        d.addErrback(self._convert_error)
+        d.addCallback(lambda res: None)
+        return d
+
+    def _convert_error(self, f):
+        if f.check(KeyError):
+            childname = f.value.args[0].encode("utf-8")
+            msg = "'%s' doesn't exist" % childname
+            raise ftp.FileNotFoundError(msg)
+        if f.check(ExistingChildError):
+            msg = f.value.args[0].encode("utf-8")
+            raise ftp.FileExistsError(msg)
+        return f
+
+    def _get_root(self, path):
+        # return (root, remaining_path)
+        path = [unicode(p) for p in path]
+        if path and path[0] == "uri":
+            d = defer.maybeDeferred(self.client.create_node_from_uri,
+                                    str(path[1]))
+            d.addCallback(lambda root: (root, path[2:]))
+        else:
+            d = defer.succeed((self.root,path))
+        return d
+
+    def _get_node_and_metadata_for_path(self, path):
+        d = self._get_root(path)
+        def _got_root((root,path)):
+            if path:
+                return root.get_child_and_metadata_at_path(path)
+            else:
+                return (root,{})
+        d.addCallback(_got_root)
+        return d
+
+    def _populate_row(self, keys, (childnode, metadata)):
+        print childnode.get_uri(), metadata
+        values = []
+        isdir = bool(IDirectoryNode.providedBy(childnode))
+        for key in keys:
+            if key == "size":
+                if isdir:
+                    value = 0
+                else:
+                    value = childnode.get_size()
+            elif key == "directory":
+                value = isdir
+            elif key == "permissions":
+                value = 0600
+            elif key == "hardlinks":
+                value = 1
+            elif key == "modified":
+                value = metadata.get("mtime", 0)
+            elif key == "owner":
+                value = self.username
+            elif key == "group":
+                value = self.username
+            else:
+                value = "??"
+            values.append(value)
+        return values
+
+    def stat(self, path, keys=()):
+        # for files only, I think
+        print "STAT", path, keys
+        d = self._get_node_and_metadata_for_path(path)
+        def _render((node,metadata)):
+            assert not IDirectoryNode.providedBy(node)
+            return self._populate_row(keys, (node,metadata))
+        d.addCallback(_render)
+        d.addErrback(self._convert_error)
+        return d
+
+    def list(self, path, keys=()):
+        print "LIST", repr(path), repr(keys)
+        # the interface claims that path is a list of unicodes, but in
+        # practice it is not
+        d = self._get_node_and_metadata_for_path(path)
+        def _list((node, metadata)):
+            if IDirectoryNode.providedBy(node):
+                return node.list()
+            return { path[-1]: (node, metadata) } # need last-edge metadata
+        d.addCallback(_list)
+        def _render(children):
+            results = []
+            for (name, childnode) in children.iteritems():
+                # the interface claims that the result should have a unicode
+                # object as the name, but it fails unless you give it a
+                # bytestring
+                results.append( (name.encode("utf-8"),
+                                 self._populate_row(keys, childnode) ) )
+            print repr(results)
+            return results
+        d.addCallback(_render)
+        d.addErrback(self._convert_error)
+        return d
+
+    def openForReading(self, path):
+        print "OPEN-READ", path
+        d = self._get_node_and_metadata_for_path(path)
+        d.addCallback(lambda (node,metadata): ReadFile(node))
+        d.addErrback(self._convert_error)
+        return d
+
+    def openForWriting(self, path):
+        print "OPEN-WRITE", path
+        path = [unicode(p) for p in path]
+        if not path:
+            raise ftp.PermissionDeniedError("cannot STOR to root directory")
+        childname = path[-1]
+        d = self._get_root(path)
+        def _got_root((root, path)):
+            print "GOT ROOT", root, path
+            if not path:
+                raise ftp.PermissionDeniedError("cannot STOR to root directory")
+            return root.get_child_at_path(path[:-1])
+        d.addCallback(_got_root)
+        def _got_parent(parent):
+            return WriteFile(parent, childname, self.convergence)
+        d.addCallback(_got_parent)
+        return d
+
+
+from twisted.vfs.adapters import ftp as vftp # register the IFTPShell adapter
+_hush_pyflakes = [vftp]
+fs = pathutils.FileSystem(osfs.OSDirectory("/tmp/ftp-test"))
+
+class Dispatcher:
+    implements(portal.IRealm)
+    def __init__(self, client, accounts):
+        self.client = client
+        self.accounts = accounts
+    def requestAvatar(self, avatarID, mind, interface):
+        assert interface == ftp.IFTPShell
+        print "REQUEST", avatarID, mind, interface
+        pw, rootcap = self.accounts[avatarID]
+        rootnode = self.client.create_node_from_uri(rootcap)
+        convergence = self.client.convergence
+        s = Handler(self.client, rootnode, avatarID, convergence)
+        return (interface, s, None)
+
+
+class FTPServer(service.MultiService):
+    def __init__(self, client, portstr, accountfile, accounturl):
+        service.MultiService.__init__(self)
+        self.client = client
+
+        assert not accounturl, "Not implemented yet"
+        assert accountfile, "Need accountfile"
+        self.accounts = {}
+        c = checkers.InMemoryUsernamePasswordDatabaseDontUse()
+        for line in open(os.path.expanduser(accountfile), "r"):
+            line = line.strip()
+            if line.startswith("#") or not line:
+                continue
+            name, passwd, rootcap = line.split()
+            self.accounts[name] = (passwd,rootcap)
+            c.addUser(name, passwd)
+
+        r = Dispatcher(client, self.accounts)
+        p = portal.Portal(r)
+        p.registerChecker(c)
+        f = ftp.FTPFactory(p)
+
+        s = strports.service(portstr, f)
+        s.setServiceParent(self)
index 52decdfc3d5671a31d245077877c8d0e16090261..f8d2a00a12e015ea0b97dcf4bd048577785ddab1 100644 (file)
@@ -7,7 +7,7 @@ from twisted.application import service
 from foolscap import DeadReferenceError
 from foolscap.eventual import eventually
 
-from allmydata.util import base32, mathutil, hashutil, log
+from allmydata.util import base32, mathutil, hashutil, log, observer
 from allmydata.util.assertutil import _assert
 from allmydata import codec, hashtree, storage, uri
 from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, \
@@ -1039,6 +1039,37 @@ class FileHandle:
     def finish(self):
         return self._filehandle
 
+class ConsumerAdapter:
+    implements(IDownloadTarget, IConsumer)
+    def __init__(self, consumer):
+        self._consumer = consumer
+        self._when_finished = observer.OneShotObserverList()
+
+    def when_finished(self):
+        return self._when_finished.when_fired()
+
+    def registerProducer(self, producer, streaming):
+        print "REG"
+        self._consumer.registerProducer(producer, streaming)
+    def unregisterProducer(self):
+        print "UNREG"
+        self._consumer.unregisterProducer()
+
+    def open(self, size):
+        pass
+    def write(self, data):
+        self._consumer.write(data)
+    def close(self):
+        self._when_finished.fire(None)
+
+    def fail(self, why):
+        self._when_finished.fire(why)
+    def register_canceller(self, cb):
+        pass
+    def finish(self):
+        return None
+
+
 class Downloader(service.MultiService):
     """I am a service that allows file downloading.
     """
index 2b68d644f7c20dc54c66c57f9add43bff63a6fb2..a1a66128ef171795f3606bac94ab6f60dbe9b148 100644 (file)
@@ -1,6 +1,7 @@
 
 from zope.interface import implements
 from twisted.internet import defer
+from twisted.internet.interfaces import IPushProducer, IConsumer
 from allmydata.interfaces import IFileNode, IFileURI, ICheckable
 from allmydata.immutable.checker import SimpleCHKFileChecker, \
      SimpleCHKFileVerifier
@@ -88,7 +89,14 @@ class FileNode(ImmutableFileNode):
         downloader = self._client.getServiceNamed("downloader")
         return downloader.download_to_data(self.get_uri())
 
-
+class LiteralProducer:
+    implements(IPushProducer)
+    def resumeProducing(self):
+        print "LIT RESUME"
+        pass
+    def stopProducing(self):
+        print "LIT STOP"
+        pass
 
 class LiteralFileNode(ImmutableFileNode):
 
@@ -116,8 +124,12 @@ class LiteralFileNode(ImmutableFileNode):
     def download(self, target):
         # note that this does not update the stats_provider
         data = self.u.data
+        if IConsumer.providedBy(target):
+            target.registerProducer(LiteralProducer(), True)
         target.open(len(data))
         target.write(data)
+        if IConsumer.providedBy(target):
+            target.unregisterProducer()
         target.close()
         return defer.maybeDeferred(target.finish)