2 from zope.interface import implements
3 from twisted.web.client import getPage
4 from twisted.internet import defer
5 from twisted.cred import error, checkers, credentials
6 from allmydata.util import base32
8 class NeedRootcapLookupScheme(Exception):
9 """Accountname+Password-based access schemes require some kind of
10 mechanism to translate name+passwd pairs into a rootcap, either a file of
11 name/passwd/rootcap tuples, or a server to do the translation."""
14 def __init__(self, username, rootcap):
15 self.username = username
16 self.rootcap = rootcap
18 class AccountFileChecker:
19 implements(checkers.ICredentialsChecker)
20 credentialInterfaces = (credentials.IUsernamePassword,
21 credentials.IUsernameHashedPassword)
22 def __init__(self, client, accountfile):
27 for line in open(os.path.expanduser(accountfile), "r"):
29 if line.startswith("#") or not line:
31 name, passwd, rest = line.split(None, 2)
32 if passwd in ("ssh-dss", "ssh-rsa"):
34 keystring = " ".join(bits[-1])
36 self.pubkeys[name] = keystring
38 self.passwords[name] = passwd
40 self.rootcaps[name] = rootcap
42 def _cbPasswordMatch(self, matched, username):
44 return FTPAvatarID(username, self.rootcaps[username])
45 raise error.UnauthorizedLogin
47 def requestAvatarId(self, credentials):
48 if credentials.username in self.passwords:
49 d = defer.maybeDeferred(credentials.checkPassword,
50 self.passwords[credentials.username])
51 d.addCallback(self._cbPasswordMatch, str(credentials.username))
53 return defer.fail(error.UnauthorizedLogin())
55 class AccountURLChecker:
56 implements(checkers.ICredentialsChecker)
57 credentialInterfaces = (credentials.IUsernamePassword,)
59 def __init__(self, client, auth_url):
61 self.auth_url = auth_url
63 def _cbPasswordMatch(self, rootcap, username):
64 return FTPAvatarID(username, rootcap)
66 def post_form(self, username, password):
67 sepbase = base32.b2a(os.urandom(4))
71 fields = {"action": "authenticate",
75 for name, value in fields.iteritems():
76 form.append('Content-Disposition: form-data; name="%s"' % name)
78 assert isinstance(value, str)
82 body = "\r\n".join(form) + "\r\n"
83 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
85 return getPage(self.auth_url, method="POST",
86 postdata=body, headers=headers,
87 followRedirect=True, timeout=30)
89 def _parse_response(self, res):
92 raise error.UnauthorizedLogin
95 def requestAvatarId(self, credentials):
96 # construct a POST to the login form. While this could theoretically
97 # be done with something like the stdlib 'email' package, I can't
98 # figure out how, so we just slam together a form manually.
99 d = self.post_form(credentials.username, credentials.password)
100 d.addCallback(self._parse_response)
101 d.addCallback(self._cbPasswordMatch, str(credentials.username))