rootcap = rest
self.rootcaps[name] = rootcap
+ def _avatarId(self, username):
+ return FTPAvatarID(username, self.rootcaps[username])
+
def _cbPasswordMatch(self, matched, username):
if matched:
- return FTPAvatarID(username, self.rootcaps[username])
+ return self._avatarId(username)
raise error.UnauthorizedLogin
def requestAvatarId(self, creds):
if creds.signature is None:
return defer.fail(conch_error.ValidPublicKey())
if self._correctSignature(creds):
- return defer.succeed(creds.username)
+ return defer.succeed(self._avatarId(creds.username))
return defer.fail(error.UnauthorizedLogin())
class AccountURLChecker:
def test_authenticated(self):
"""
- AccountFileChecker.requestAvatarId returns a Deferred that fires with
- the username portion of the account file line that matches the username
- and key blob portion of the SSHPrivateKey object if that object also
- has a correct signature.
+ If called with an SSHPrivateKey object with a username and public key
+ found in the account file and a signature that proves possession of the
+ corresponding private key, AccountFileChecker.requestAvatarId returns a
+ Deferred that fires with an FTPAvatarID giving the username and root
+ capability for that user.
"""
username = b"carol"
signed_data = b"signed data"
key_credentials = credentials.SSHPrivateKey(
username, b"md5", right_key_blob, signed_data, signature)
avatarId = self.checker.requestAvatarId(key_credentials)
- avatarId.addCallback(self.assertEqual, username)
+ def authenticated(avatarId):
+ self.assertEqual(
+ (username,
+ b"URI:DIR2:cccccccccccccccccccccccccc:3333333333333333333333333333333333333333333333333333"),
+ (avatarId.username, avatarId.rootcap))
+ avatarId.addCallback(authenticated)
return avatarId