from twisted.web.client import getPage
from twisted.internet import defer
from twisted.cred import error, checkers, credentials
+from twisted.conch import error as conch_error
+from twisted.conch.ssh import keys
+
from allmydata.util import base32
class NeedRootcapLookupScheme(Exception):
class AccountFileChecker:
implements(checkers.ICredentialsChecker)
credentialInterfaces = (credentials.IUsernamePassword,
- credentials.IUsernameHashedPassword)
+ credentials.IUsernameHashedPassword,
+ credentials.ISSHPrivateKey)
def __init__(self, client, accountfile):
self.client = client
self.passwords = {}
name, passwd, rest = line.split(None, 2)
if passwd in ("ssh-dss", "ssh-rsa"):
bits = rest.split()
- keystring = " ".join(bits[-1])
+ keystring = " ".join([passwd] + bits[:-1])
rootcap = bits[-1]
self.pubkeys[name] = keystring
else:
return FTPAvatarID(username, self.rootcaps[username])
raise error.UnauthorizedLogin
- def requestAvatarId(self, credentials):
- if credentials.username in self.passwords:
- d = defer.maybeDeferred(credentials.checkPassword,
- self.passwords[credentials.username])
- d.addCallback(self._cbPasswordMatch, str(credentials.username))
- return d
+ def requestAvatarId(self, creds):
+ if credentials.ISSHPrivateKey.providedBy(creds):
+ # Re-using twisted.conch.checkers.SSHPublicKeyChecker here, rather
+ # than re-implementing all of the ISSHPrivateKey checking logic,
+ # would be better. That would require Twisted 14.1.0 or newer,
+ # though.
+ return self._checkKey(creds)
+ elif credentials.IUsernameHashedPassword.providedBy(creds):
+ return self._checkPassword(creds)
+ elif credentials.IUsernamePassword.providedBy(creds):
+ return self._checkPassword(creds)
+ else:
+ raise NotImplementedError()
+
+ def _checkPassword(self, creds):
+ """
+ Determine whether the password in the given credentials matches the
+ password in the account file.
+
+ Returns a Deferred that fires with the username if the password matches
+ or with an UnauthorizedLogin failure otherwise.
+ """
+ try:
+ correct = self.passwords[creds.username]
+ except KeyError:
+ return defer.fail(error.UnauthorizedLogin())
+
+ d = defer.maybeDeferred(creds.checkPassword, correct)
+ d.addCallback(self._cbPasswordMatch, str(creds.username))
+ return d
+
+ def _allowedKey(self, creds):
+ """
+ Determine whether the public key indicated by the given credentials is
+ one allowed to authenticate the username in those credentials.
+
+ Returns True if so, False otherwise.
+ """
+ return creds.blob == self.pubkeys.get(creds.username)
+
+ def _correctSignature(self, creds):
+ """
+ Determine whether the signature in the given credentials is the correct
+ signature for the data in those credentials.
+
+ Returns True if so, False otherwise.
+ """
+ key = keys.Key.fromString(creds.blob)
+ return key.verify(creds.signature, creds.sigData)
+
+ def _checkKey(self, creds):
+ """
+ Determine whether some key-based credentials correctly authenticates a
+ user.
+
+ Returns a Deferred that fires with the username if so or with an
+ UnauthorizedLogin failure otherwise.
+ """
+ if self._allowedKey(creds):
+ if creds.signature is None:
+ return defer.fail(conch_error.ValidPublicKey())
+ if self._correctSignature(creds):
+ return defer.succeed(creds.username)
return defer.fail(error.UnauthorizedLogin())
class AccountURLChecker: