132 lines
5 KiB
Python
132 lines
5 KiB
Python
# Copyright (C) 2015 Patrick Uiterwijk, for license see COPYING
|
|
|
|
from __future__ import absolute_import
|
|
|
|
from python_freeipa.client_meta import ClientMeta as IPAClient
|
|
from python_freeipa.exceptions import InvalidSessionPassword
|
|
|
|
from ipsilon.providers.openid.extensions.common import OpenidExtensionBase
|
|
import ipsilon.root
|
|
from ipsilon.util.page import Page
|
|
from ipsilon.util.user import User
|
|
|
|
import json
|
|
import inspect
|
|
from configparser import ConfigParser
|
|
|
|
|
|
class OpenidExtension(OpenidExtensionBase):
|
|
|
|
def __init__(self, *pargs):
|
|
super(OpenidExtension, self).__init__('API')
|
|
|
|
def enable(self):
|
|
# This is the most ugly hack in my history of python...
|
|
# But I need to find the root object, and that is not passed into
|
|
# the OpenID extension system anywhere...
|
|
root_obj = inspect.stack()[5][0].f_locals['self']
|
|
root_obj.api = APIPage(root_obj)
|
|
|
|
|
|
class APIPage(Page):
|
|
def __init__(self, root_obj):
|
|
ipsilon.root.sites['api'] = dict()
|
|
ipsilon.root.sites['api']['template_env'] = \
|
|
ipsilon.root.sites['default']['template_env']
|
|
super(APIPage, self).__init__(ipsilon.root.sites['api'])
|
|
self.v1 = APIV1Page(root_obj)
|
|
|
|
|
|
class APIV1Page(Page):
|
|
def __init__(self, root_obj):
|
|
ipsilon.root.sites['api_v1'] = dict()
|
|
ipsilon.root.sites['api_v1']['template_env'] = \
|
|
ipsilon.root.sites['default']['template_env']
|
|
super(APIV1Page, self).__init__(ipsilon.root.sites['api_v1'])
|
|
self.root_obj = root_obj
|
|
|
|
def root(self, *args, **kwargs):
|
|
return json.dumps(self._perform_call(kwargs))
|
|
|
|
def _perform_call(self, arguments):
|
|
required_arguments = ['auth_module', 'username', 'password']
|
|
for arg in required_arguments:
|
|
if not arg in arguments:
|
|
return {'success': False,
|
|
'status': 400,
|
|
'message': 'Missing argument: %s' % arg
|
|
}
|
|
|
|
openid = self.root_obj.openid
|
|
|
|
openid_request = None
|
|
try:
|
|
openid_request = openid.cfg.server.decodeRequest(arguments)
|
|
except Exception as ex:
|
|
print('Error during openid decoding: %s' % ex)
|
|
return {'success': False,
|
|
'status': 400,
|
|
'message': 'Invalid request'
|
|
}
|
|
if not openid_request:
|
|
print('No OpenID request parsed')
|
|
return {'success': False,
|
|
'status': 400,
|
|
'message': 'Invalid request'
|
|
}
|
|
if not arguments['auth_module'] == 'fedoauth.auth.fas.Auth_FAS':
|
|
print('Unknown auth module selected')
|
|
return {'success': False,
|
|
'status': 400,
|
|
'message': 'Unknown authentication module'
|
|
}
|
|
username = arguments['username']
|
|
password = arguments['password']
|
|
user = None
|
|
userdata = None
|
|
|
|
# Check auth with IPA directly
|
|
ipa_config = ConfigParser()
|
|
ipa_config.read("/etc/ipa/default.conf")
|
|
ipa_server = ipa_config.get("global", "server", fallback=None)
|
|
ipa = IPAClient(ipa_server, verify_ssl="/etc/ipa/ca.crt")
|
|
try:
|
|
auth = ipa.login(username, password)
|
|
except InvalidSessionPassword:
|
|
print('Could not authenticate %s to IPA' % username)
|
|
return {'success': False,
|
|
'status': 400,
|
|
'message': 'Authentication failed'}
|
|
if auth and auth.logged_in:
|
|
user = ipa.user_find(whoami=True)["result"][0]
|
|
userdata = {
|
|
"nickname": user["uid"][0],
|
|
"fullname": user["displayname"][0],
|
|
"_groups": user["memberof_group"],
|
|
"email": user["mail"][0],
|
|
"givenname": user["givenname"], # It's not a list? WTF?
|
|
"surname": user["sn"][0],
|
|
"zoneinfo": user.get("fastimezone", [None])[0],
|
|
}
|
|
userdata["human_name"] = userdata["fullname"]
|
|
userdata["name"] = userdata["fullname"]
|
|
userdata["preferred_username"] = userdata["nickname"]
|
|
userdata["_username"] = userdata["nickname"]
|
|
else:
|
|
print('Error during auth: %s' % auth.login_exception)
|
|
|
|
if user is None or userdata is None:
|
|
print('No user or data: %s, %s' % (user, userdata))
|
|
return {'success': False,
|
|
'status': 400,
|
|
'message': 'Authentication failed'}
|
|
|
|
us_obj = User(username)
|
|
fake_session = lambda: None
|
|
setattr(fake_session, 'get_user', lambda *args: us_obj)
|
|
setattr(fake_session, 'get_user_attrs', lambda *args: userdata)
|
|
|
|
openid_response = openid._response(openid_request, fake_session)
|
|
openid_response = openid.cfg.server.signatory.sign(openid_response).fields.toPostArgs()
|
|
return {'success': True,
|
|
'response': openid_response}
|