]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/auth.py
Add the rest of the failure-case tests and a success-case test. Update the implement...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / auth.py
1 import os
2 from zope.interface import implements
3 from twisted.web.client import getPage
4 from twisted.internet import defer
5 from twisted.cred import error, checkers, credentials
6 from twisted.conch import error as conch_error
7 from twisted.conch.ssh import keys
8
9 from allmydata.util import base32
10
11 class NeedRootcapLookupScheme(Exception):
12     """Accountname+Password-based access schemes require some kind of
13     mechanism to translate name+passwd pairs into a rootcap, either a file of
14     name/passwd/rootcap tuples, or a server to do the translation."""
15
16 class FTPAvatarID:
17     def __init__(self, username, rootcap):
18         self.username = username
19         self.rootcap = rootcap
20
21 class AccountFileChecker:
22     implements(checkers.ICredentialsChecker)
23     credentialInterfaces = (credentials.IUsernamePassword,
24                             credentials.IUsernameHashedPassword,
25                             credentials.ISSHPrivateKey)
26     def __init__(self, client, accountfile):
27         self.client = client
28         self.passwords = {}
29         self.pubkeys = {}
30         self.rootcaps = {}
31         for line in open(os.path.expanduser(accountfile), "r"):
32             line = line.strip()
33             if line.startswith("#") or not line:
34                 continue
35             name, passwd, rest = line.split(None, 2)
36             if passwd in ("ssh-dss", "ssh-rsa"):
37                 bits = rest.split()
38                 keystring = " ".join([passwd] + bits[:-1])
39                 rootcap = bits[-1]
40                 self.pubkeys[name] = keystring
41             else:
42                 self.passwords[name] = passwd
43                 rootcap = rest
44             self.rootcaps[name] = rootcap
45
46     def _cbPasswordMatch(self, matched, username):
47         if matched:
48             return FTPAvatarID(username, self.rootcaps[username])
49         raise error.UnauthorizedLogin
50
51     def requestAvatarId(self, creds):
52         if credentials.ISSHPrivateKey.providedBy(creds):
53             # Re-using twisted.conch.checkers.SSHPublicKeyChecker here, rather
54             # than re-implementing all of the ISSHPrivateKey checking logic,
55             # would be better.  That would require Twisted 14.1.0 or newer,
56             # though.
57             return self._checkKey(creds)
58         elif credentials.IUsernameHashedPassword.providedBy(creds):
59             return self._checkPassword(creds)
60         elif credentials.IUsernamePassword.providedBy(creds):
61             return self._checkPassword(creds)
62         else:
63             raise NotImplementedError()
64
65     def _checkPassword(self, creds):
66         """
67         Determine whether the password in the given credentials matches the
68         password in the account file.
69
70         Returns a Deferred that fires with the username if the password matches
71         or with an UnauthorizedLogin failure otherwise.
72         """
73         try:
74             correct = self.passwords[creds.username]
75         except KeyError:
76             return defer.fail(error.UnauthorizedLogin())
77
78         d = defer.maybeDeferred(creds.checkPassword, correct)
79         d.addCallback(self._cbPasswordMatch, str(creds.username))
80         return d
81
82     def _allowedKey(self, creds):
83         """
84         Determine whether the public key indicated by the given credentials is
85         one allowed to authenticate the username in those credentials.
86
87         Returns True if so, False otherwise.
88         """
89         return creds.blob == self.pubkeys.get(creds.username)
90
91     def _correctSignature(self, creds):
92         """
93         Determine whether the signature in the given credentials is the correct
94         signature for the data in those credentials.
95
96         Returns True if so, False otherwise.
97         """
98         key = keys.Key.fromString(creds.blob)
99         return key.verify(creds.signature, creds.sigData)
100
101     def _checkKey(self, creds):
102         """
103         Determine whether some key-based credentials correctly authenticates a
104         user.
105
106         Returns a Deferred that fires with the username if so or with an
107         UnauthorizedLogin failure otherwise.
108         """
109         if self._allowedKey(creds):
110             if creds.signature is None:
111                 return defer.fail(conch_error.ValidPublicKey())
112             if self._correctSignature(creds):
113                 return defer.succeed(creds.username)
114         return defer.fail(error.UnauthorizedLogin())
115
116 class AccountURLChecker:
117     implements(checkers.ICredentialsChecker)
118     credentialInterfaces = (credentials.IUsernamePassword,)
119
120     def __init__(self, client, auth_url):
121         self.client = client
122         self.auth_url = auth_url
123
124     def _cbPasswordMatch(self, rootcap, username):
125         return FTPAvatarID(username, rootcap)
126
127     def post_form(self, username, password):
128         sepbase = base32.b2a(os.urandom(4))
129         sep = "--" + sepbase
130         form = []
131         form.append(sep)
132         fields = {"action": "authenticate",
133                   "email": username,
134                   "passwd": password,
135                   }
136         for name, value in fields.iteritems():
137             form.append('Content-Disposition: form-data; name="%s"' % name)
138             form.append('')
139             assert isinstance(value, str)
140             form.append(value)
141             form.append(sep)
142         form[-1] += "--"
143         body = "\r\n".join(form) + "\r\n"
144         headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
145                    }
146         return getPage(self.auth_url, method="POST",
147                        postdata=body, headers=headers,
148                        followRedirect=True, timeout=30)
149
150     def _parse_response(self, res):
151         rootcap = res.strip()
152         if rootcap == "0":
153             raise error.UnauthorizedLogin
154         return rootcap
155
156     def requestAvatarId(self, credentials):
157         # construct a POST to the login form. While this could theoretically
158         # be done with something like the stdlib 'email' package, I can't
159         # figure out how, so we just slam together a form manually.
160         d = self.post_form(credentials.username, credentials.password)
161         d.addCallback(self._parse_response)
162         d.addCallback(self._cbPasswordMatch, str(credentials.username))
163         return d
164