cr-connect-workflow/crc/api/user.py

138 lines
4.6 KiB
Python

import json
import connexion
from flask import redirect, g, request
from crc import sso, app, db
from crc.api.common import ApiError
from crc.models.user import UserModel, UserModelSchema
"""
.. module:: crc.api.user
:synopsis: Single Sign On (SSO) user login and session handlers
"""
def verify_token(token):
failure_error = ApiError("invalid_token", "Unable to decode the token you provided. Please re-authenticate", status_code=403)
if (not 'PRODUCTION' in app.config or not app.config['PRODUCTION']) and token == app.config["SWAGGER_AUTH_KEY"]:
g.user = UserModel.query.first()
token = g.user.encode_auth_token()
try:
token_info = UserModel.decode_auth_token(token)
g.user = UserModel.query.filter_by(uid=token_info['sub']).first()
except:
raise failure_error
if g.user is not None:
return token_info
else:
raise failure_error
def get_current_user():
return UserModelSchema().dump(g.user)
@sso.login_handler
def sso_login(user_info):
redirect = request.args.get('redirect')
app.logger.info("SSO_LOGIN: Full URL: " + request.url)
app.logger.info("SSO_LOGIN: User Details: " + json.dump(user_info))
app.logger.info("SSO_LOGIN: Will try to redirect to : " + redirect)
# TODO: Get redirect URL from Shibboleth request header
_handle_login(user_info, redirect)
def _handle_login(user_info, redirect_url=app.config['FRONTEND_AUTH_CALLBACK']):
"""On successful login, adds user to database if the user is not already in the system,
then returns the frontend auth callback URL, with auth token appended.
Args:
user_info (dict of {
uid: str,
affiliation: Optional[str],
display_name: Optional[str],
email_address: Optional[str],
eppn: Optional[str],
first_name: Optional[str],
last_name: Optional[str],
title: Optional[str],
}): Dictionary of user attributes
redirect_url: Optional[str]
Returns:
Response. 302 - Redirects to the frontend auth callback URL, with auth token appended.
"""
uid = user_info['uid']
user = db.session.query(UserModel).filter(UserModel.uid == uid).first()
if user is None:
# Add new user
user = UserModelSchema().load(user_info, session=db.session)
else:
# Update existing user data
user = UserModelSchema().load(user_info, session=db.session, instance=user, partial=True)
# Build display_name if not set
if 'display_name' not in user_info or len(user_info['display_name']) == 0:
display_name_list = []
for prop in ['first_name', 'last_name']:
if prop in user_info and len(user_info[prop]) > 0:
display_name_list.append(user_info[prop])
user.display_name = ' '.join(display_name_list)
db.session.add(user)
db.session.commit()
# Return the frontend auth callback URL, with auth token appended.
auth_token = user.encode_auth_token().decode()
if redirect_url is not None:
app.logger.info("SSO_LOGIN: REDIRECTING TO: " + redirect_url)
return redirect('%s/%s' % (redirect_url, auth_token))
else:
app.logger.info("SSO_LOGIN: NO REDIRECT, JUST RETURNING AUTH TOKEN.")
return auth_token
def backdoor(
uid=None,
affiliation=None,
display_name=None,
email_address=None,
eppn=None,
first_name=None,
last_name=None,
title=None,
redirect_url=None,
):
"""A backdoor for end-to-end system testing that allows the system to simulate logging in as a specific user.
Only works if the application is running in a non-production environment.
Args:
uid: str
affiliation: Optional[str]
display_name: Optional[str]
email_address: Optional[str]
eppn: Optional[str]
first_name: Optional[str]
last_name: Optional[str]
title: Optional[str]
redirect_url: Optional[str]
Returns:
str. If not on production, returns the frontend auth callback URL, with auth token appended.
Raises:
ApiError. If on production, returns a 404 error.
"""
if not 'PRODUCTION' in app.config or not app.config['PRODUCTION']:
user_info = {}
for key in UserModel.__dict__.keys():
if key in connexion.request.args:
user_info[key] = connexion.request.args[key]
return _handle_login(user_info, redirect_url)
else:
raise ApiError('404', 'unknown')