from twisted.web.client import getPage
from twisted.internet import defer
from twisted.cred import error, checkers, credentials
+from twisted.conch import error as conch_error
+from twisted.conch.ssh import keys
+
from allmydata.util import base32
class NeedRootcapLookupScheme(Exception):
class AccountFileChecker:
implements(checkers.ICredentialsChecker)
credentialInterfaces = (credentials.IUsernamePassword,
- credentials.IUsernameHashedPassword)
+ credentials.IUsernameHashedPassword,
+ credentials.ISSHPrivateKey)
def __init__(self, client, accountfile):
self.client = client
self.passwords = {}
name, passwd, rest = line.split(None, 2)
if passwd in ("ssh-dss", "ssh-rsa"):
bits = rest.split()
- keystring = " ".join(bits[-1])
+ keystring = " ".join([passwd] + bits[:-1])
rootcap = bits[-1]
self.pubkeys[name] = keystring
else:
return FTPAvatarID(username, self.rootcaps[username])
raise error.UnauthorizedLogin
- def requestAvatarId(self, credentials):
- if credentials.username in self.passwords:
- d = defer.maybeDeferred(credentials.checkPassword,
- self.passwords[credentials.username])
- d.addCallback(self._cbPasswordMatch, str(credentials.username))
- return d
+ def requestAvatarId(self, creds):
+ if credentials.ISSHPrivateKey.providedBy(creds):
+ # Re-using twisted.conch.checkers.SSHPublicKeyChecker here, rather
+ # than re-implementing all of the ISSHPrivateKey checking logic,
+ # would be better. That would require Twisted 14.1.0 or newer,
+ # though.
+ return self._checkKey(creds)
+ elif credentials.IUsernameHashedPassword.providedBy(creds):
+ return self._checkPassword(creds)
+ elif credentials.IUsernamePassword.providedBy(creds):
+ return self._checkPassword(creds)
+ else:
+ raise NotImplementedError()
+
+ def _checkPassword(self, creds):
+ """
+ Determine whether the password in the given credentials matches the
+ password in the account file.
+
+ Returns a Deferred that fires with the username if the password matches
+ or with an UnauthorizedLogin failure otherwise.
+ """
+ try:
+ correct = self.passwords[creds.username]
+ except KeyError:
+ return defer.fail(error.UnauthorizedLogin())
+
+ d = defer.maybeDeferred(creds.checkPassword, correct)
+ d.addCallback(self._cbPasswordMatch, str(creds.username))
+ return d
+
+ def _allowedKey(self, creds):
+ """
+ Determine whether the public key indicated by the given credentials is
+ one allowed to authenticate the username in those credentials.
+
+ Returns True if so, False otherwise.
+ """
+ return creds.blob == self.pubkeys.get(creds.username)
+
+ def _correctSignature(self, creds):
+ """
+ Determine whether the signature in the given credentials is the correct
+ signature for the data in those credentials.
+
+ Returns True if so, False otherwise.
+ """
+ key = keys.Key.fromString(creds.blob)
+ return key.verify(creds.signature, creds.sigData)
+
+ def _checkKey(self, creds):
+ """
+ Determine whether some key-based credentials correctly authenticates a
+ user.
+
+ Returns a Deferred that fires with the username if so or with an
+ UnauthorizedLogin failure otherwise.
+ """
+ if self._allowedKey(creds):
+ if creds.signature is None:
+ return defer.fail(conch_error.ValidPublicKey())
+ if self._correctSignature(creds):
+ return defer.succeed(creds.username)
return defer.fail(error.UnauthorizedLogin())
class AccountURLChecker:
from twisted.trial import unittest
from twisted.python import filepath
from twisted.cred import error, credentials
+from twisted.conch import error as conch_error
from twisted.conch.ssh import keys
from allmydata.frontends import auth
DUMMY_ACCOUNTS = u"""\
alice password URI:DIR2:aaaaaaaaaaaaaaaaaaaaaaaaaa:1111111111111111111111111111111111111111111111111111
bob sekrit URI:DIR2:bbbbbbbbbbbbbbbbbbbbbbbbbb:2222222222222222222222222222222222222222222222222222
-carol %(key)s URI:DIR2:cccccccccccccccccccccccccc:3333333333333333333333333333333333333333333333333333
-""".format(DUMMY_KEY.public().toString("openssh")).encode("ascii")
+carol {key} URI:DIR2:cccccccccccccccccccccccccc:3333333333333333333333333333333333333333333333333333
+""".format(key=DUMMY_KEY.public().toString("openssh")).encode("ascii")
class AccountFileCheckerKeyTests(unittest.TestCase):
"""
avatarId = self.checker.requestAvatarId(key_credentials)
return self.assertFailure(avatarId, error.UnauthorizedLogin)
+ def test_password_auth_user(self):
+ """
+ AccountFileChecker.requestAvatarId returns a Deferred that fires with
+ UnauthorizedLogin if called with an SSHPrivateKey object for a username
+ only associated with a password in the account file.
+ """
+ key_credentials = credentials.SSHPrivateKey(
+ b"alice", b"md5", None, None, None)
+ avatarId = self.checker.requestAvatarId(key_credentials)
+ return self.assertFailure(avatarId, error.UnauthorizedLogin)
+
def test_unrecognized_key(self):
"""
AccountFileChecker.requestAvatarId returns a Deferred that fires with
b"carol", b"md5", wrong_key_blob, None, None)
avatarId = self.checker.requestAvatarId(key_credentials)
return self.assertFailure(avatarId, error.UnauthorizedLogin)
+
+ def test_missing_signature(self):
+ """
+ AccountFileChecker.requestAvatarId returns a Deferred that fires with
+ ValidPublicKey if called with an SSHPrivateKey object with an
+ authorized key for the indicated user but with no signature.
+ """
+ right_key_blob = DUMMY_KEY.public().toString("openssh")
+ key_credentials = credentials.SSHPrivateKey(
+ b"carol", b"md5", right_key_blob, None, None)
+ avatarId = self.checker.requestAvatarId(key_credentials)
+ return self.assertFailure(avatarId, conch_error.ValidPublicKey)
+
+ def test_wrong_signature(self):
+ """
+ AccountFileChecker.requestAvatarId returns a Deferred that fires with
+ UnauthorizedLogin if called with an SSHPrivateKey object with a public
+ key matching that on the user's line in the account file but with the
+ wrong signature.
+ """
+ right_key_blob = DUMMY_KEY.public().toString("openssh")
+ key_credentials = credentials.SSHPrivateKey(
+ b"carol", b"md5", right_key_blob, b"signed data", b"wrong sig")
+ avatarId = self.checker.requestAvatarId(key_credentials)
+ return self.assertFailure(avatarId, error.UnauthorizedLogin)
+
+ def test_authenticated(self):
+ """
+ AccountFileChecker.requestAvatarId returns a Deferred that fires with
+ the username portion of the account file line that matches the username
+ and key blob portion of the SSHPrivateKey object if that object also
+ has a correct signature.
+ """
+ username = b"carol"
+ signed_data = b"signed data"
+ signature = DUMMY_KEY.sign(signed_data)
+ right_key_blob = DUMMY_KEY.public().toString("openssh")
+ key_credentials = credentials.SSHPrivateKey(
+ username, b"md5", right_key_blob, signed_data, signature)
+ avatarId = self.checker.requestAvatarId(key_credentials)
+ avatarId.addCallback(self.assertEqual, username)
+ return avatarId