ipsilon-fedora/ipsilon/providers/openid/extensions/api.py
Aurélien Bompard 8f259d9236
Don't crash on users who don't have a timezone
Signed-off-by: Aurélien Bompard <aurelien@bompard.org>
2022-01-28 18:22:32 +01:00

132 lines
5 KiB
Python

# Copyright (C) 2015 Patrick Uiterwijk, for license see COPYING
from __future__ import absolute_import
from python_freeipa.client_meta import ClientMeta as IPAClient
from python_freeipa.exceptions import InvalidSessionPassword
from ipsilon.providers.openid.extensions.common import OpenidExtensionBase
import ipsilon.root
from ipsilon.util.page import Page
from ipsilon.util.user import User
import json
import inspect
from configparser import ConfigParser
class OpenidExtension(OpenidExtensionBase):
def __init__(self, *pargs):
super(OpenidExtension, self).__init__('API')
def enable(self):
# This is the most ugly hack in my history of python...
# But I need to find the root object, and that is not passed into
# the OpenID extension system anywhere...
root_obj = inspect.stack()[5][0].f_locals['self']
root_obj.api = APIPage(root_obj)
class APIPage(Page):
def __init__(self, root_obj):
ipsilon.root.sites['api'] = dict()
ipsilon.root.sites['api']['template_env'] = \
ipsilon.root.sites['default']['template_env']
super(APIPage, self).__init__(ipsilon.root.sites['api'])
self.v1 = APIV1Page(root_obj)
class APIV1Page(Page):
def __init__(self, root_obj):
ipsilon.root.sites['api_v1'] = dict()
ipsilon.root.sites['api_v1']['template_env'] = \
ipsilon.root.sites['default']['template_env']
super(APIV1Page, self).__init__(ipsilon.root.sites['api_v1'])
self.root_obj = root_obj
def root(self, *args, **kwargs):
return json.dumps(self._perform_call(kwargs))
def _perform_call(self, arguments):
required_arguments = ['auth_module', 'username', 'password']
for arg in required_arguments:
if not arg in arguments:
return {'success': False,
'status': 400,
'message': 'Missing argument: %s' % arg
}
openid = self.root_obj.openid
openid_request = None
try:
openid_request = openid.cfg.server.decodeRequest(arguments)
except Exception as ex:
print('Error during openid decoding: %s' % ex)
return {'success': False,
'status': 400,
'message': 'Invalid request'
}
if not openid_request:
print('No OpenID request parsed')
return {'success': False,
'status': 400,
'message': 'Invalid request'
}
if not arguments['auth_module'] == 'fedoauth.auth.fas.Auth_FAS':
print('Unknown auth module selected')
return {'success': False,
'status': 400,
'message': 'Unknown authentication module'
}
username = arguments['username']
password = arguments['password']
user = None
userdata = None
# Check auth with IPA directly
ipa_config = ConfigParser()
ipa_config.read("/etc/ipa/default.conf")
ipa_server = ipa_config.get("global", "server", fallback=None)
ipa = IPAClient(ipa_server, verify_ssl="/etc/ipa/ca.crt")
try:
auth = ipa.login(username, password)
except InvalidSessionPassword:
print('Could not authenticate %s to IPA' % username)
return {'success': False,
'status': 400,
'message': 'Authentication failed'}
if auth and auth.logged_in:
user = ipa.user_find(whoami=True)["result"][0]
userdata = {
"nickname": user["uid"][0],
"fullname": user["displayname"][0],
"_groups": user["memberof_group"],
"email": user["mail"][0],
"givenname": user["givenname"], # It's not a list? WTF?
"surname": user["sn"][0],
"zoneinfo": user.get("fastimezone", [None])[0],
}
userdata["human_name"] = userdata["fullname"]
userdata["name"] = userdata["fullname"]
userdata["preferred_username"] = userdata["nickname"]
userdata["_username"] = userdata["nickname"]
else:
print('Error during auth: %s' % auth.login_exception)
if user is None or userdata is None:
print('No user or data: %s, %s' % (user, userdata))
return {'success': False,
'status': 400,
'message': 'Authentication failed'}
us_obj = User(username)
fake_session = lambda: None
setattr(fake_session, 'get_user', lambda *args: us_obj)
setattr(fake_session, 'get_user_attrs', lambda *args: userdata)
openid_response = openid._response(openid_request, fake_session)
openid_response = openid.cfg.server.signatory.sign(openid_response).fields.toPostArgs()
return {'success': True,
'response': openid_response}