From: Brian Warner Date: Mon, 6 Oct 2008 19:52:36 +0000 (-0700) Subject: ftp server: initial implementation. Still needs unit tests, custom Twisted patches... X-Git-Url: https://git.rkrishnan.org/specifications/components/com_hotproperty/reliability?a=commitdiff_plain;h=bc237b3956e8f96c89e9e2925d5859d1862ae117;p=tahoe-lafs%2Ftahoe-lafs.git ftp server: initial implementation. Still needs unit tests, custom Twisted patches. For #512 --- diff --git a/docs/ftp.txt b/docs/ftp.txt new file mode 100644 index 00000000..0c8a57a2 --- /dev/null +++ b/docs/ftp.txt @@ -0,0 +1,48 @@ += Tahoe FTP Frontend = + +All Tahoe client nodes can run a frontend FTP server, allowing regular FTP +clients to access the virtual filesystem. + +Since Tahoe does not use user accounts or passwords, the FTP server must be +configured with a way to translate USER+PASS into a root directory cap. Two +mechanisms are provided. The first is a simple flat file with one account per +line. The second is an HTTP-based login mechanism, backed by simple PHP +script and a database. The latter form is used by allmydata.com to provide +secure access to customer rootcaps. + +== Configuring an Account File == + +To configure the first form, create a file (probably in +BASEDIR/private/ftp.accounts) in which each non-comment/non-blank line is a +space-separated line of (USERNAME, PASSWORD, ROOTCAP), like so: + + % cat BASEDIR/private/ftp.accounts + # This is a password file, (username, password, rootcap) + alice password URI:DIR2:ioej8xmzrwilg772gzj4fhdg7a:wtiizszzz2rgmczv4wl6bqvbv33ag4kvbr6prz3u6w3geixa6m6a + bob sekrit URI:DIR2:6bdmeitystckbl9yqlw7g56f4e:serp5ioqxnh34mlbmzwvkp3odehsyrr7eytt5f64we3k9hhcrcja + +Then add the following lines to the BASEDIR/tahoe.cfg file: + + [ftpd] + enabled = true + ftp.port = 8021 + ftp.accounts.file = private/ftp.accounts + +The FTP server will listen on the given port number. The ftp.accounts.file +pathname will be interpreted relative to the node's BASEDIR. + +== Configuring an Account Server == + +Determine the URL of the account server, say https://example.com/login . Then +add the following lines to BASEDIR/tahoe.cfg: + + [ftpd] + enabled = true + ftp.port = 8021 + ftp.accounts.url = https://example.com/login + +== Dependencies == + +The FTP server requires Twisted's "vfs" component, which is not included in +the Twisted-8.1.0 release. If there is no newer release available, it may be +necessary to run Twisted from the SVN trunk to obtain this component. diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 42c6a9c8..ccfe9b1e 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -78,6 +78,7 @@ class Client(node.Node, testutil.PollMixin): if key_gen_furl: self.init_key_gen(key_gen_furl) # ControlServer and Helper are attached after Tub startup + self.init_ftp_server() hotline_file = os.path.join(self.basedir, self.SUICIDE_PREVENTION_HOTLINE_FILE) @@ -253,6 +254,17 @@ class Client(node.Node, testutil.PollMixin): ws = WebishServer(webport, nodeurl_path) self.add_service(ws) + def init_ftp_server(self): + if not self.get_config("ftpd", "enabled", False, boolean=True): + return + portstr = self.get_config("ftpd", "ftp.port", "8021") + accountfile = self.get_config("ftpd", "ftp.accounts.file", None) + accounturl = self.get_config("ftpd", "ftp.accounts.url", None) + + from allmydata import ftpd + s = ftpd.FTPServer(self, portstr, accountfile, accounturl) + s.setServiceParent(self) + def _check_hotline(self, hotline_file): if os.path.exists(hotline_file): mtime = os.stat(hotline_file)[stat.ST_MTIME] diff --git a/src/allmydata/ftpd.py b/src/allmydata/ftpd.py new file mode 100644 index 00000000..e19af52b --- /dev/null +++ b/src/allmydata/ftpd.py @@ -0,0 +1,367 @@ + +import os +import tempfile +from zope.interface import implements +from twisted.application import service, strports +from twisted.internet import defer +from twisted.internet.interfaces import IConsumer +from twisted.protocols import ftp +from twisted.cred import portal, checkers +from twisted.vfs import pathutils +from twisted.vfs.backends import osfs +from twisted.python import log + +from allmydata.interfaces import IDirectoryNode, ExistingChildError +from allmydata.immutable.download import ConsumerAdapter +from allmydata.immutable.upload import FileHandle + +class ReadFile: + implements(ftp.IReadFile) + def __init__(self, node): + self.node = node + def send(self, consumer): + print "SEND", consumer + ad = ConsumerAdapter(consumer) + d = self.node.download(ad) + def _downloaded(res): + print "DONE" + d.addCallback(_downloaded) + return d # when consumed + +class FileWriter: + implements(IConsumer) + def __init__(self, parent, childname, convergence): + self.parent = parent + self.childname = childname + self.convergence = convergence + + def registerProducer(self, producer, streaming): + self.producer = producer + print "producer", producer + print "streaming", streaming + if not streaming: + raise NotImplementedError("Non-streaming producer not supported.") + # we write the data to a temporary file, since Tahoe can't do + # streaming upload yet. + self.f = tempfile.TemporaryFile() + return None + def unregisterProducer(self): + # now we do the upload. + + # bummer: unregisterProducer is run synchronously. twisted's FTP + # server (twisted.protocols.ftp.DTP._unregConsumer:454) ignores our + # return value, and sends the 226 Transfer Complete right after + # unregisterProducer returns. The client will believe that the + # transfer is indeed complete, whereas for us it is just starting. + # Some clients will do an immediate LIST to see if the file was + # really uploaded. + print "UPLOAD STSARTING" + u = FileHandle(self.f, self.convergence) + d = self.parent.add_file(self.childname, u) + def _done(res): + print "UPLOAD DONE", res + d.addCallback(_done) + # by patching twisted.protocols.ftp.DTP._unregConsumer to pass this + # Deferred back, we can obtain the async-upload that we desire. + return d + + def write(self, data): + print "write(%d)" % len(data) + # if streaming==True, then the sender/producer is in charge. We are + # allowed to call self.producer.pauseProducing() when we have too + # much data and want them to stop, but they might not listen to us. + # We must then call self.producer.resumeProducing() when we can + # accomodate more. + # + # if streaming==False, then we (the consumer) are in charge. We must + # keep calling p.resumeProducing() (which will prompt them to call + # our .write again) until they finish. + # + # If we experience an error, call p.stopProducing(). + self.f.write(data) + +class WriteFile: + implements(ftp.IWriteFile) + def __init__(self, parent, childname, convergence): + self.parent = parent + self.childname = childname + self.convergence = convergence + def receive(self): + print "RECEIVE" + try: + c = FileWriter(self.parent, self.childname, self.convergence) + except: + print "PROBLEM" + log.err() + raise + return defer.succeed(c) + +class NoParentError(Exception): + pass + +class Handler: + implements(ftp.IFTPShell) + def __init__(self, client, rootnode, username, convergence): + self.client = client + self.root = rootnode + self.username = username + self.convergence = convergence + + def makeDirectory(self, path): + print "MAKEDIR", path + d = self._get_root(path) + d.addCallback(lambda (root,path): + self._get_or_create_directories(root, path)) + return d + + def _get_or_create_directories(self, node, path): + if not IDirectoryNode.providedBy(node): + # unfortunately it is too late to provide the name of the + # blocking directory in the error message. + raise ftp.FileExistsError("cannot create directory because there " + "is a file in the way") + if not path: + return defer.succeed(node) + d = node.get(path[0]) + def _maybe_create(f): + f.trap(KeyError) + return node.create_empty_directory(path[0]) + d.addErrback(_maybe_create) + d.addCallback(self._get_or_create_directories, path[1:]) + return d + + def _get_parent(self, path): + # fire with (parentnode, childname) + path = [unicode(p) for p in path] + if not path: + raise NoParentError + childname = path[-1] + d = self._get_root(path) + def _got_root((root, path)): + print "GOT ROOT", root, path + if not path: + raise NoParentError + return root.get_child_at_path(path[:-1]) + d.addCallback(_got_root) + def _got_parent(parent): + return (parent, childname) + d.addCallback(_got_parent) + return d + + def _remove_thing(self, path, must_be_directory=False, must_be_file=False): + d = defer.maybeDeferred(self._get_parent, path) + def _convert_error(f): + f.trap(NoParentError) + raise ftp.PermissionDeniedError("cannot delete root directory") + d.addErrback(_convert_error) + def _got_parent( (parent, childname) ): + print "GOT PARENT", parent + d = parent.get(childname) + def _got_child(child): + print "GOT HILD", child + if must_be_directory and not IDirectoryNode.providedBy(child): + raise ftp.IsNotADirectoryError("rmdir called on a file") + if must_be_file and IDirectoryNode.providedBy(child): + raise ftp.IsADirectoryError("rmfile called on a directory") + return parent.delete(childname) + d.addCallback(_got_child) + d.addErrback(self._convert_error) + return d + d.addCallback(_got_parent) + return d + + def removeDirectory(self, path): + print "RMDIR", path + return self._remove_thing(path, must_be_directory=True) + + def removeFile(self, path): + print "RM", path + return self._remove_thing(path, must_be_file=True) + + def rename(self, fromPath, toPath): + print "MV", fromPath, toPath + # the target directory must already exist + d = self._get_parent(fromPath) + def _got_from_parent( (fromparent, childname) ): + d = self._get_parent(toPath) + d.addCallback(lambda (toparent, tochildname): + fromparent.move_child_to(childname, + toparent, tochildname, + overwrite=False)) + return d + d.addCallback(_got_from_parent) + d.addErrback(self._convert_error) + return d + + def access(self, path): + print "ACCESS", path + # we allow access to everything that exists. We are required to raise + # an error for paths that don't exist: FTP clients (at least ncftp) + # uses this to decide whether to mkdir or not. + d = self._get_node_and_metadata_for_path(path) + d.addErrback(self._convert_error) + d.addCallback(lambda res: None) + return d + + def _convert_error(self, f): + if f.check(KeyError): + childname = f.value.args[0].encode("utf-8") + msg = "'%s' doesn't exist" % childname + raise ftp.FileNotFoundError(msg) + if f.check(ExistingChildError): + msg = f.value.args[0].encode("utf-8") + raise ftp.FileExistsError(msg) + return f + + def _get_root(self, path): + # return (root, remaining_path) + path = [unicode(p) for p in path] + if path and path[0] == "uri": + d = defer.maybeDeferred(self.client.create_node_from_uri, + str(path[1])) + d.addCallback(lambda root: (root, path[2:])) + else: + d = defer.succeed((self.root,path)) + return d + + def _get_node_and_metadata_for_path(self, path): + d = self._get_root(path) + def _got_root((root,path)): + if path: + return root.get_child_and_metadata_at_path(path) + else: + return (root,{}) + d.addCallback(_got_root) + return d + + def _populate_row(self, keys, (childnode, metadata)): + print childnode.get_uri(), metadata + values = [] + isdir = bool(IDirectoryNode.providedBy(childnode)) + for key in keys: + if key == "size": + if isdir: + value = 0 + else: + value = childnode.get_size() + elif key == "directory": + value = isdir + elif key == "permissions": + value = 0600 + elif key == "hardlinks": + value = 1 + elif key == "modified": + value = metadata.get("mtime", 0) + elif key == "owner": + value = self.username + elif key == "group": + value = self.username + else: + value = "??" + values.append(value) + return values + + def stat(self, path, keys=()): + # for files only, I think + print "STAT", path, keys + d = self._get_node_and_metadata_for_path(path) + def _render((node,metadata)): + assert not IDirectoryNode.providedBy(node) + return self._populate_row(keys, (node,metadata)) + d.addCallback(_render) + d.addErrback(self._convert_error) + return d + + def list(self, path, keys=()): + print "LIST", repr(path), repr(keys) + # the interface claims that path is a list of unicodes, but in + # practice it is not + d = self._get_node_and_metadata_for_path(path) + def _list((node, metadata)): + if IDirectoryNode.providedBy(node): + return node.list() + return { path[-1]: (node, metadata) } # need last-edge metadata + d.addCallback(_list) + def _render(children): + results = [] + for (name, childnode) in children.iteritems(): + # the interface claims that the result should have a unicode + # object as the name, but it fails unless you give it a + # bytestring + results.append( (name.encode("utf-8"), + self._populate_row(keys, childnode) ) ) + print repr(results) + return results + d.addCallback(_render) + d.addErrback(self._convert_error) + return d + + def openForReading(self, path): + print "OPEN-READ", path + d = self._get_node_and_metadata_for_path(path) + d.addCallback(lambda (node,metadata): ReadFile(node)) + d.addErrback(self._convert_error) + return d + + def openForWriting(self, path): + print "OPEN-WRITE", path + path = [unicode(p) for p in path] + if not path: + raise ftp.PermissionDeniedError("cannot STOR to root directory") + childname = path[-1] + d = self._get_root(path) + def _got_root((root, path)): + print "GOT ROOT", root, path + if not path: + raise ftp.PermissionDeniedError("cannot STOR to root directory") + return root.get_child_at_path(path[:-1]) + d.addCallback(_got_root) + def _got_parent(parent): + return WriteFile(parent, childname, self.convergence) + d.addCallback(_got_parent) + return d + + +from twisted.vfs.adapters import ftp as vftp # register the IFTPShell adapter +_hush_pyflakes = [vftp] +fs = pathutils.FileSystem(osfs.OSDirectory("/tmp/ftp-test")) + +class Dispatcher: + implements(portal.IRealm) + def __init__(self, client, accounts): + self.client = client + self.accounts = accounts + def requestAvatar(self, avatarID, mind, interface): + assert interface == ftp.IFTPShell + print "REQUEST", avatarID, mind, interface + pw, rootcap = self.accounts[avatarID] + rootnode = self.client.create_node_from_uri(rootcap) + convergence = self.client.convergence + s = Handler(self.client, rootnode, avatarID, convergence) + return (interface, s, None) + + +class FTPServer(service.MultiService): + def __init__(self, client, portstr, accountfile, accounturl): + service.MultiService.__init__(self) + self.client = client + + assert not accounturl, "Not implemented yet" + assert accountfile, "Need accountfile" + self.accounts = {} + c = checkers.InMemoryUsernamePasswordDatabaseDontUse() + for line in open(os.path.expanduser(accountfile), "r"): + line = line.strip() + if line.startswith("#") or not line: + continue + name, passwd, rootcap = line.split() + self.accounts[name] = (passwd,rootcap) + c.addUser(name, passwd) + + r = Dispatcher(client, self.accounts) + p = portal.Portal(r) + p.registerChecker(c) + f = ftp.FTPFactory(p) + + s = strports.service(portstr, f) + s.setServiceParent(self) diff --git a/src/allmydata/immutable/download.py b/src/allmydata/immutable/download.py index 52decdfc..f8d2a00a 100644 --- a/src/allmydata/immutable/download.py +++ b/src/allmydata/immutable/download.py @@ -7,7 +7,7 @@ from twisted.application import service from foolscap import DeadReferenceError from foolscap.eventual import eventually -from allmydata.util import base32, mathutil, hashutil, log +from allmydata.util import base32, mathutil, hashutil, log, observer from allmydata.util.assertutil import _assert from allmydata import codec, hashtree, storage, uri from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, \ @@ -1039,6 +1039,37 @@ class FileHandle: def finish(self): return self._filehandle +class ConsumerAdapter: + implements(IDownloadTarget, IConsumer) + def __init__(self, consumer): + self._consumer = consumer + self._when_finished = observer.OneShotObserverList() + + def when_finished(self): + return self._when_finished.when_fired() + + def registerProducer(self, producer, streaming): + print "REG" + self._consumer.registerProducer(producer, streaming) + def unregisterProducer(self): + print "UNREG" + self._consumer.unregisterProducer() + + def open(self, size): + pass + def write(self, data): + self._consumer.write(data) + def close(self): + self._when_finished.fire(None) + + def fail(self, why): + self._when_finished.fire(why) + def register_canceller(self, cb): + pass + def finish(self): + return None + + class Downloader(service.MultiService): """I am a service that allows file downloading. """ diff --git a/src/allmydata/immutable/filenode.py b/src/allmydata/immutable/filenode.py index 2b68d644..a1a66128 100644 --- a/src/allmydata/immutable/filenode.py +++ b/src/allmydata/immutable/filenode.py @@ -1,6 +1,7 @@ from zope.interface import implements from twisted.internet import defer +from twisted.internet.interfaces import IPushProducer, IConsumer from allmydata.interfaces import IFileNode, IFileURI, ICheckable from allmydata.immutable.checker import SimpleCHKFileChecker, \ SimpleCHKFileVerifier @@ -88,7 +89,14 @@ class FileNode(ImmutableFileNode): downloader = self._client.getServiceNamed("downloader") return downloader.download_to_data(self.get_uri()) - +class LiteralProducer: + implements(IPushProducer) + def resumeProducing(self): + print "LIT RESUME" + pass + def stopProducing(self): + print "LIT STOP" + pass class LiteralFileNode(ImmutableFileNode): @@ -116,8 +124,12 @@ class LiteralFileNode(ImmutableFileNode): def download(self, target): # note that this does not update the stats_provider data = self.u.data + if IConsumer.providedBy(target): + target.registerProducer(LiteralProducer(), True) target.open(len(data)) target.write(data) + if IConsumer.providedBy(target): + target.unregisterProducer() target.close() return defer.maybeDeferred(target.finish)