2 from zope.interface import implements
3 from twisted.web.client import getPage
4 from twisted.internet import defer
5 from twisted.cred import error, checkers, credentials
6 from twisted.conch import error as conch_error
7 from twisted.conch.ssh import keys
9 from allmydata.util import base32
11 class NeedRootcapLookupScheme(Exception):
12 """Accountname+Password-based access schemes require some kind of
13 mechanism to translate name+passwd pairs into a rootcap, either a file of
14 name/passwd/rootcap tuples, or a server to do the translation."""
17 def __init__(self, username, rootcap):
18 self.username = username
19 self.rootcap = rootcap
21 class AccountFileChecker:
22 implements(checkers.ICredentialsChecker)
23 credentialInterfaces = (credentials.IUsernamePassword,
24 credentials.IUsernameHashedPassword,
25 credentials.ISSHPrivateKey)
26 def __init__(self, client, accountfile):
31 for line in open(os.path.expanduser(accountfile), "r"):
33 if line.startswith("#") or not line:
35 name, passwd, rest = line.split(None, 2)
36 if passwd.startswith("ssh-"):
38 keystring = " ".join([passwd] + bits[:-1])
40 self.pubkeys[name] = keystring
42 self.passwords[name] = passwd
44 self.rootcaps[name] = rootcap
46 def _avatarId(self, username):
47 return FTPAvatarID(username, self.rootcaps[username])
49 def _cbPasswordMatch(self, matched, username):
51 return self._avatarId(username)
52 raise error.UnauthorizedLogin
54 def requestAvatarId(self, creds):
55 if credentials.ISSHPrivateKey.providedBy(creds):
56 # Re-using twisted.conch.checkers.SSHPublicKeyChecker here, rather
57 # than re-implementing all of the ISSHPrivateKey checking logic,
58 # would be better. That would require Twisted 14.1.0 or newer,
60 return self._checkKey(creds)
61 elif credentials.IUsernameHashedPassword.providedBy(creds):
62 return self._checkPassword(creds)
63 elif credentials.IUsernamePassword.providedBy(creds):
64 return self._checkPassword(creds)
66 raise NotImplementedError()
68 def _checkPassword(self, creds):
70 Determine whether the password in the given credentials matches the
71 password in the account file.
73 Returns a Deferred that fires with the username if the password matches
74 or with an UnauthorizedLogin failure otherwise.
77 correct = self.passwords[creds.username]
79 return defer.fail(error.UnauthorizedLogin())
81 d = defer.maybeDeferred(creds.checkPassword, correct)
82 d.addCallback(self._cbPasswordMatch, str(creds.username))
85 def _checkKey(self, creds):
87 Determine whether some key-based credentials correctly authenticates a
90 Returns a Deferred that fires with the username if so or with an
91 UnauthorizedLogin failure otherwise.
94 # Is the public key indicated by the given credentials allowed to
95 # authenticate the username in those credentials?
96 if creds.blob == self.pubkeys.get(creds.username):
97 if creds.signature is None:
98 return defer.fail(conch_error.ValidPublicKey())
100 # Is the signature in the given credentials the correct
101 # signature for the data in those credentials?
102 key = keys.Key.fromString(creds.blob)
103 if key.verify(creds.signature, creds.sigData):
104 return defer.succeed(self._avatarId(creds.username))
106 return defer.fail(error.UnauthorizedLogin())
108 class AccountURLChecker:
109 implements(checkers.ICredentialsChecker)
110 credentialInterfaces = (credentials.IUsernamePassword,)
112 def __init__(self, client, auth_url):
114 self.auth_url = auth_url
116 def _cbPasswordMatch(self, rootcap, username):
117 return FTPAvatarID(username, rootcap)
119 def post_form(self, username, password):
120 sepbase = base32.b2a(os.urandom(4))
124 fields = {"action": "authenticate",
128 for name, value in fields.iteritems():
129 form.append('Content-Disposition: form-data; name="%s"' % name)
131 assert isinstance(value, str)
135 body = "\r\n".join(form) + "\r\n"
136 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
138 return getPage(self.auth_url, method="POST",
139 postdata=body, headers=headers,
140 followRedirect=True, timeout=30)
142 def _parse_response(self, res):
143 rootcap = res.strip()
145 raise error.UnauthorizedLogin
148 def requestAvatarId(self, credentials):
149 # construct a POST to the login form. While this could theoretically
150 # be done with something like the stdlib 'email' package, I can't
151 # figure out how, so we just slam together a form manually.
152 d = self.post_form(credentials.username, credentials.password)
153 d.addCallback(self._parse_response)
154 d.addCallback(self._cbPasswordMatch, str(credentials.username))