2 from zope.interface import implements
3 from twisted.web.client import getPage
4 from twisted.internet import defer
5 from twisted.cred import error, checkers, credentials
6 from twisted.conch import error as conch_error
7 from twisted.conch.ssh import keys
9 from allmydata.util import base32
11 class NeedRootcapLookupScheme(Exception):
12 """Accountname+Password-based access schemes require some kind of
13 mechanism to translate name+passwd pairs into a rootcap, either a file of
14 name/passwd/rootcap tuples, or a server to do the translation."""
17 def __init__(self, username, rootcap):
18 self.username = username
19 self.rootcap = rootcap
21 class AccountFileChecker:
22 implements(checkers.ICredentialsChecker)
23 credentialInterfaces = (credentials.IUsernamePassword,
24 credentials.IUsernameHashedPassword,
25 credentials.ISSHPrivateKey)
26 def __init__(self, client, accountfile):
31 for line in open(os.path.expanduser(accountfile), "r"):
33 if line.startswith("#") or not line:
35 name, passwd, rest = line.split(None, 2)
36 if passwd in ("ssh-dss", "ssh-rsa"):
38 keystring = " ".join([passwd] + bits[:-1])
40 self.pubkeys[name] = keystring
42 self.passwords[name] = passwd
44 self.rootcaps[name] = rootcap
46 def _cbPasswordMatch(self, matched, username):
48 return FTPAvatarID(username, self.rootcaps[username])
49 raise error.UnauthorizedLogin
51 def requestAvatarId(self, creds):
52 if credentials.ISSHPrivateKey.providedBy(creds):
53 # Re-using twisted.conch.checkers.SSHPublicKeyChecker here, rather
54 # than re-implementing all of the ISSHPrivateKey checking logic,
55 # would be better. That would require Twisted 14.1.0 or newer,
57 return self._checkKey(creds)
58 elif credentials.IUsernameHashedPassword.providedBy(creds):
59 return self._checkPassword(creds)
60 elif credentials.IUsernamePassword.providedBy(creds):
61 return self._checkPassword(creds)
63 raise NotImplementedError()
65 def _checkPassword(self, creds):
67 Determine whether the password in the given credentials matches the
68 password in the account file.
70 Returns a Deferred that fires with the username if the password matches
71 or with an UnauthorizedLogin failure otherwise.
74 correct = self.passwords[creds.username]
76 return defer.fail(error.UnauthorizedLogin())
78 d = defer.maybeDeferred(creds.checkPassword, correct)
79 d.addCallback(self._cbPasswordMatch, str(creds.username))
82 def _allowedKey(self, creds):
84 Determine whether the public key indicated by the given credentials is
85 one allowed to authenticate the username in those credentials.
87 Returns True if so, False otherwise.
89 return creds.blob == self.pubkeys.get(creds.username)
91 def _correctSignature(self, creds):
93 Determine whether the signature in the given credentials is the correct
94 signature for the data in those credentials.
96 Returns True if so, False otherwise.
98 key = keys.Key.fromString(creds.blob)
99 return key.verify(creds.signature, creds.sigData)
101 def _checkKey(self, creds):
103 Determine whether some key-based credentials correctly authenticates a
106 Returns a Deferred that fires with the username if so or with an
107 UnauthorizedLogin failure otherwise.
109 if self._allowedKey(creds):
110 if creds.signature is None:
111 return defer.fail(conch_error.ValidPublicKey())
112 if self._correctSignature(creds):
113 return defer.succeed(creds.username)
114 return defer.fail(error.UnauthorizedLogin())
116 class AccountURLChecker:
117 implements(checkers.ICredentialsChecker)
118 credentialInterfaces = (credentials.IUsernamePassword,)
120 def __init__(self, client, auth_url):
122 self.auth_url = auth_url
124 def _cbPasswordMatch(self, rootcap, username):
125 return FTPAvatarID(username, rootcap)
127 def post_form(self, username, password):
128 sepbase = base32.b2a(os.urandom(4))
132 fields = {"action": "authenticate",
136 for name, value in fields.iteritems():
137 form.append('Content-Disposition: form-data; name="%s"' % name)
139 assert isinstance(value, str)
143 body = "\r\n".join(form) + "\r\n"
144 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
146 return getPage(self.auth_url, method="POST",
147 postdata=body, headers=headers,
148 followRedirect=True, timeout=30)
150 def _parse_response(self, res):
151 rootcap = res.strip()
153 raise error.UnauthorizedLogin
156 def requestAvatarId(self, credentials):
157 # construct a POST to the login form. While this could theoretically
158 # be done with something like the stdlib 'email' package, I can't
159 # figure out how, so we just slam together a form manually.
160 d = self.post_form(credentials.username, credentials.password)
161 d.addCallback(self._parse_response)
162 d.addCallback(self._cbPasswordMatch, str(credentials.username))