]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_auth.py
b61531b1b31a043926c318f92f47d29f0d638a81
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_auth.py
1 from twisted.trial import unittest
2 from twisted.python import filepath
3 from twisted.cred import error, credentials
4 from twisted.conch import error as conch_error
5 from twisted.conch.ssh import keys
6
7 from allmydata.frontends import auth
8
9 DUMMY_KEY = keys.Key.fromString("""\
10 -----BEGIN RSA PRIVATE KEY-----
11 MIICXQIBAAKBgQDEP3DYiukOu+NrUlBZeLL9JoHkK5nSvINYfeOQWYVW9J5NG485
12 pZFVUQKzvvht34Ihj4ucrrvj7vOp+FFvzxI+zHKBpDxyJwV96dvWDAZMjxTxL7iV
13 8HcO7hqgtQ/Xk1Kjde5lH3EOEDs3IhFHA+sox9y6i4A5NUr2AJZSHiOEVwIDAQAB
14 AoGASrrNwefDr7SkeS2zIx7vKa8ML1LbFIBsk7n8ee9c8yvbTAl+lLkTiqV6ne/O
15 sig2aYk75MI1Eirf5o2ElUsI6u36i6AeKL2u/W7tLBVijmBB8dTiWZ5gMOARWt8w
16 daF2An2826YdcU+iNZ7Yi0q4xtlxHQn3JcNNWxicphLvt0ECQQDtajJ/bK+Nqd9j
17 /WGvqYcMzkkorQq/0+MQYhcIwDlpf2Xoi45tP4HeoBubeJmU5+jXpXmdP5epWpBv
18 k3ZCwV7pAkEA05xBP2HTdwRFTJov5I/w7uKOrn7mj7DCvSjQFCufyPOoCJJMeBSq
19 tfCQlHFtwlkyNfiSbhtgZ0Pp6ovL+1RBPwJBAOlFRBKxrpgpxcXQK5BWqMwrT/S4
20 eWxb+6mYR3ugq4h91Zq0rJ+pG6irdhS/XV/SsZRZEXIxDoom4u3OXQ9gQikCQErM
21 ywuaiuNhMRXY0uEaOHJYx1LLLLjSJKQ0zwiyOvMPnfAZtsojlAxoEtNGHSQ731HQ
22 ogIlzzfxe7ga3mni6IUCQQCwNK9zwARovcQ8nByqotGQzohpl+1b568+iw8GXP2u
23 dBSD8940XU3YW+oeq8e+p3yQ2GinHfeJ3BYQyNQLuMAJ
24 -----END RSA PRIVATE KEY-----
25 """)
26
27 DUMMY_ACCOUNTS = u"""\
28 alice password URI:DIR2:aaaaaaaaaaaaaaaaaaaaaaaaaa:1111111111111111111111111111111111111111111111111111
29 bob sekrit URI:DIR2:bbbbbbbbbbbbbbbbbbbbbbbbbb:2222222222222222222222222222222222222222222222222222
30 carol {key} URI:DIR2:cccccccccccccccccccccccccc:3333333333333333333333333333333333333333333333333333
31 """.format(key=DUMMY_KEY.public().toString("openssh")).encode("ascii")
32
33 class AccountFileCheckerKeyTests(unittest.TestCase):
34     """
35     Tests for key handling done by allmydata.frontends.auth.AccountFileChecker.
36     """
37     def setUp(self):
38         self.account_file = filepath.FilePath(self.mktemp())
39         self.account_file.setContent(DUMMY_ACCOUNTS)
40         self.checker = auth.AccountFileChecker(None, self.account_file.path)
41
42     def test_unknown_user(self):
43         """
44         AccountFileChecker.requestAvatarId returns a Deferred that fires with
45         UnauthorizedLogin if called with an SSHPrivateKey object with a
46         username not present in the account file.
47         """
48         key_credentials = credentials.SSHPrivateKey(
49             b"dennis", b"md5", None, None, None)
50         avatarId = self.checker.requestAvatarId(key_credentials)
51         return self.assertFailure(avatarId, error.UnauthorizedLogin)
52
53     def test_password_auth_user(self):
54         """
55         AccountFileChecker.requestAvatarId returns a Deferred that fires with
56         UnauthorizedLogin if called with an SSHPrivateKey object for a username
57         only associated with a password in the account file.
58         """
59         key_credentials = credentials.SSHPrivateKey(
60             b"alice", b"md5", None, None, None)
61         avatarId = self.checker.requestAvatarId(key_credentials)
62         return self.assertFailure(avatarId, error.UnauthorizedLogin)
63
64     def test_unrecognized_key(self):
65         """
66         AccountFileChecker.requestAvatarId returns a Deferred that fires with
67         UnauthorizedLogin if called with an SSHPrivateKey object with a public
68         key other than the one indicated in the account file for the indicated
69         user.
70         """
71         wrong_key_blob = b"""\
72 ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAYQDJGMWlPXh2M3pYzTiamjcBIMqctt4VvLVW2QZgEFc86XhGjPXq5QAiRTKv9yVZJR9HW70CfBI7GHun8+v4Wb6aicWBoxgI3OB5NN+OUywdme2HSaif5yenFdQr0ME71Xs=
73 """
74         key_credentials = credentials.SSHPrivateKey(
75             b"carol", b"md5", wrong_key_blob, None, None)
76         avatarId = self.checker.requestAvatarId(key_credentials)
77         return self.assertFailure(avatarId, error.UnauthorizedLogin)
78
79     def test_missing_signature(self):
80         """
81         AccountFileChecker.requestAvatarId returns a Deferred that fires with
82         ValidPublicKey if called with an SSHPrivateKey object with an
83         authorized key for the indicated user but with no signature.
84         """
85         right_key_blob = DUMMY_KEY.public().toString("openssh")
86         key_credentials = credentials.SSHPrivateKey(
87             b"carol", b"md5", right_key_blob, None, None)
88         avatarId = self.checker.requestAvatarId(key_credentials)
89         return self.assertFailure(avatarId, conch_error.ValidPublicKey)
90
91     def test_wrong_signature(self):
92         """
93         AccountFileChecker.requestAvatarId returns a Deferred that fires with
94         UnauthorizedLogin if called with an SSHPrivateKey object with a public
95         key matching that on the user's line in the account file but with the
96         wrong signature.
97         """
98         right_key_blob = DUMMY_KEY.public().toString("openssh")
99         key_credentials = credentials.SSHPrivateKey(
100             b"carol", b"md5", right_key_blob, b"signed data", b"wrong sig")
101         avatarId = self.checker.requestAvatarId(key_credentials)
102         return self.assertFailure(avatarId, error.UnauthorizedLogin)
103
104     def test_authenticated(self):
105         """
106         If called with an SSHPrivateKey object with a username and public key
107         found in the account file and a signature that proves possession of the
108         corresponding private key, AccountFileChecker.requestAvatarId returns a
109         Deferred that fires with an FTPAvatarID giving the username and root
110         capability for that user.
111         """
112         username = b"carol"
113         signed_data = b"signed data"
114         signature = DUMMY_KEY.sign(signed_data)
115         right_key_blob = DUMMY_KEY.public().toString("openssh")
116         key_credentials = credentials.SSHPrivateKey(
117             username, b"md5", right_key_blob, signed_data, signature)
118         avatarId = self.checker.requestAvatarId(key_credentials)
119         def authenticated(avatarId):
120             self.assertEqual(
121                 (username,
122                  b"URI:DIR2:cccccccccccccccccccccccccc:3333333333333333333333333333333333333333333333333333"),
123                 (avatarId.username, avatarId.rootcap))
124         avatarId.addCallback(authenticated)
125         return avatarId