cr-connect-workflow/crc/api/user.py

155 lines
5.0 KiB
Python

import flask
from flask import g, request
from crc import app, db
from crc.api.common import ApiError
from crc.models.user import UserModel, UserModelSchema
from crc.services.ldap_service import LdapService, LdapModel
"""
.. module:: crc.api.user
:synopsis: Single Sign On (SSO) user login and session handlers
"""
def verify_token(token):
failure_error = ApiError("invalid_token", "Unable to decode the token you provided. Please re-authenticate", status_code=403)
if (not 'PRODUCTION' in app.config or not app.config['PRODUCTION']) and token == app.config["SWAGGER_AUTH_KEY"]:
g.user = UserModel.query.first()
token = g.user.encode_auth_token()
try:
token_info = UserModel.decode_auth_token(token)
g.user = UserModel.query.filter_by(uid=token_info['sub']).first()
except:
raise failure_error
if g.user is not None:
return token_info
else:
raise failure_error
def get_current_user():
return UserModelSchema().dump(g.user)
@app.route('/v1.0/login')
def sso_login():
# This what I see coming back:
# X-Remote-Cn: Daniel Harold Funk (dhf8r)
# X-Remote-Sn: Funk
# X-Remote-Givenname: Daniel
# X-Remote-Uid: dhf8r
# Eppn: dhf8r@virginia.edu
# Cn: Daniel Harold Funk (dhf8r)
# Sn: Funk
# Givenname: Daniel
# Uid: dhf8r
# X-Remote-User: dhf8r@virginia.edu
# X-Forwarded-For: 128.143.0.10
# X-Forwarded-Host: dev.crconnect.uvadcos.io
# X-Forwarded-Server: dev.crconnect.uvadcos.io
# Connection: Keep-Alive
uid = request.headers.get("Uid")
if not uid:
uid = request.headers.get("X-Remote-Uid")
if not uid:
raise ApiError("invalid_sso_credentials", "'Uid' nor 'X-Remote-Uid' were present in the headers: %s"
% str(request.headers))
redirect = request.args.get('redirect')
app.logger.info("SSO_LOGIN: Full URL: " + request.url)
app.logger.info("SSO_LOGIN: User Id: " + uid)
app.logger.info("SSO_LOGIN: Will try to redirect to : " + str(redirect))
info = LdapService.user_info(uid)
return _handle_login(info, redirect)
@app.route('/sso')
def sso():
response = ""
response += "<h1>Headers</h1>"
response += "<ul>"
for k,v in request.headers:
response += "<li><b>%s</b> %s</li>\n" % (k, v)
response += "<h1>Environment</h1>"
for k,v in request.environ:
response += "<li><b>%s</b> %s</li>\n" % (k, v)
return response
def _handle_login(user_info: LdapModel, redirect_url=app.config['FRONTEND_AUTH_CALLBACK']):
"""On successful login, adds user to database if the user is not already in the system,
then returns the frontend auth callback URL, with auth token appended.
Args:
user_info - an ldap user_info object.
redirect_url: Optional[str]
Returns:
Response. 302 - Redirects to the frontend auth callback URL, with auth token appended.
"""
user = db.session.query(UserModel).filter(UserModel.uid == user_info.uid).first()
if user is None:
# Add new user
user = UserModel()
user.uid = user_info.uid
user.display_name = user_info.display_name
user.email_address = user_info.email_address
user.affiliation = user_info.affiliation
user.title = user_info.title
db.session.add(user)
db.session.commit()
# Return the frontend auth callback URL, with auth token appended.
auth_token = user.encode_auth_token().decode()
if redirect_url is not None:
if redirect_url.find("http://") != 0 and redirect_url.find("https://") != 0:
redirect_url = "http://" + redirect_url
url = '%s?token=%s' % (redirect_url, auth_token)
app.logger.info("SSO_LOGIN: REDIRECTING TO: " + url)
return flask.redirect(url, code=302)
else:
app.logger.info("SSO_LOGIN: NO REDIRECT, JUST RETURNING AUTH TOKEN.")
return auth_token
def backdoor(
uid=None,
affiliation=None,
display_name=None,
email_address=None,
eppn=None,
first_name=None,
last_name=None,
title=None,
redirect=None,
):
"""A backdoor for end-to-end system testing that allows the system to simulate logging in as a specific user.
Only works if the application is running in a non-production environment.
Args:
uid: str
affiliation: Optional[str]
display_name: Optional[str]
email_address: Optional[str]
eppn: Optional[str]
first_name: Optional[str]
last_name: Optional[str]
title: Optional[str]
redirect_url: Optional[str]
Returns:
str. If not on production, returns the frontend auth callback URL, with auth token appended.
Raises:
ApiError. If on production, returns a 404 error.
"""
if not 'PRODUCTION' in app.config or not app.config['PRODUCTION']:
ldap_info = LdapService.user_info(uid)
return _handle_login(ldap_info, redirect)
else:
raise ApiError('404', 'unknown')