mirror of
https://github.com/sartography/cr-connect-workflow.git
synced 2025-02-22 12:48:25 +00:00
Upgrading libraries, and resolving issues from that upgrade, including changes to how we manage tokens. This seems to be working locally.
272 lines
8.6 KiB
Python
272 lines
8.6 KiB
Python
import flask
|
|
from flask import g, request
|
|
|
|
from crc import app, session
|
|
from crc.api.common import ApiError
|
|
from crc.models.user import UserModel, UserModelSchema
|
|
from crc.services.ldap_service import LdapService, LdapModel
|
|
from crc.services.user_service import UserService
|
|
|
|
"""
|
|
.. module:: crc.api.user
|
|
:synopsis: Single Sign On (SSO) user login and session handlers
|
|
"""
|
|
|
|
|
|
def verify_token(token=None):
|
|
"""
|
|
Verifies the token for the user (if provided). If in production environment and token is not provided,
|
|
gets user from the SSO headers and returns their token.
|
|
|
|
Args:
|
|
token: Optional[str]
|
|
|
|
Returns:
|
|
token: str
|
|
|
|
Raises:
|
|
ApiError. If not on production and token is not valid, returns an 'invalid_token' 403 error.
|
|
If on production and user is not authenticated, returns a 'no_user' 403 error.
|
|
"""
|
|
|
|
failure_error = ApiError("invalid_token", "Unable to decode the token you provided. Please re-authenticate",
|
|
status_code=403)
|
|
|
|
if token:
|
|
try:
|
|
token_info = UserModel.decode_auth_token(token)
|
|
g.user = UserModel.query.filter_by(uid=token_info['sub']).first()
|
|
|
|
# If the user is valid, store the token for this session
|
|
if g.user:
|
|
g.token = token
|
|
except:
|
|
raise failure_error
|
|
if g.user is not None:
|
|
return token_info
|
|
else:
|
|
raise failure_error
|
|
|
|
# If there's no token and we're in production, get the user from the SSO headers and return their token
|
|
elif _is_production():
|
|
uid = _get_request_uid(request)
|
|
|
|
if uid is not None:
|
|
db_user = UserModel.query.filter_by(uid=uid).first()
|
|
|
|
# If the user is valid, store the user and token for this session
|
|
if db_user is not None:
|
|
g.user = db_user
|
|
token = g.user.encode_auth_token().decode()
|
|
g.token = token
|
|
token_info = UserModel.decode_auth_token(token)
|
|
return token_info
|
|
|
|
else:
|
|
raise ApiError("no_user",
|
|
"User not found. Please login via the frontend app before accessing this feature.",
|
|
status_code=403)
|
|
|
|
else:
|
|
# Fall back to a default user if this is not production.
|
|
g.user = UserModel.query.first()
|
|
if not g.user:
|
|
raise ApiError("no_user", "You are in development mode, but there are no users in the database. Add one, and it will use it.")
|
|
token = g.user.encode_auth_token()
|
|
token_info = UserModel.decode_auth_token(token)
|
|
return token_info
|
|
|
|
|
|
def verify_token_admin(token=None):
|
|
"""
|
|
Verifies the token for the user (if provided) in non-production environment.
|
|
If in production environment, checks that the user is in the list of authorized admins
|
|
|
|
Args:
|
|
token: Optional[str]
|
|
|
|
Returns:
|
|
token: str
|
|
"""
|
|
verify_token(token)
|
|
if "user" in g and g.user.is_admin():
|
|
token = g.user.encode_auth_token()
|
|
token_info = UserModel.decode_auth_token(token)
|
|
return token_info
|
|
|
|
|
|
def start_impersonating(uid):
|
|
if uid is not None and UserService.user_is_admin():
|
|
UserService.start_impersonating(uid)
|
|
|
|
user = UserService.current_user(allow_admin_impersonate=True)
|
|
return UserModelSchema().dump(user)
|
|
|
|
|
|
def stop_impersonating():
|
|
if UserService.user_is_admin():
|
|
UserService.stop_impersonating()
|
|
|
|
user = UserService.current_user(allow_admin_impersonate=False)
|
|
return UserModelSchema().dump(user)
|
|
|
|
|
|
def get_current_user(admin_impersonate_uid=None):
|
|
if UserService.user_is_admin():
|
|
if admin_impersonate_uid is not None:
|
|
UserService.start_impersonating(admin_impersonate_uid)
|
|
else:
|
|
UserService.stop_impersonating()
|
|
|
|
user = UserService.current_user(UserService.user_is_admin() and UserService.admin_is_impersonating())
|
|
return UserModelSchema().dump(user)
|
|
|
|
|
|
def get_all_users():
|
|
if "user" in g and g.user.is_admin():
|
|
all_users = session.query(UserModel).all()
|
|
return UserModelSchema(many=True).dump(all_users)
|
|
|
|
|
|
def login(
|
|
uid=None,
|
|
redirect_url=None,
|
|
):
|
|
"""
|
|
In non-production environment, provides an endpoint for end-to-end system testing that allows the system
|
|
to simulate logging in as a specific user. In production environment, simply logs user in via single-sign-on
|
|
(SSO) Shibboleth authentication headers.
|
|
|
|
Args:
|
|
uid: Optional[str]
|
|
redirect_url: Optional[str]
|
|
|
|
Returns:
|
|
str. If not on production, returns the frontend auth callback URL, with auth token appended.
|
|
If on production and user is authenticated via SSO, returns the frontend auth callback URL,
|
|
with auth token appended.
|
|
|
|
Raises:
|
|
ApiError. If on production and user is not authenticated, returns a 404 error.
|
|
"""
|
|
|
|
# ----------------------------------------
|
|
# Shibboleth Authentication Headers
|
|
# ----------------------------------------
|
|
# X-Remote-Cn: Daniel Harold Funk (dhf8r)
|
|
# X-Remote-Sn: Funk
|
|
# X-Remote-Givenname: Daniel
|
|
# X-Remote-Uid: dhf8r
|
|
# Eppn: dhf8r@virginia.edu
|
|
# Cn: Daniel Harold Funk (dhf8r)
|
|
# Sn: Funk
|
|
# Givenname: Daniel
|
|
# Uid: dhf8r
|
|
# X-Remote-User: dhf8r@virginia.edu
|
|
# X-Forwarded-For: 128.143.0.10
|
|
# X-Forwarded-Host: dev.crconnect.uvadcos.io
|
|
# X-Forwarded-Server: dev.crconnect.uvadcos.io
|
|
# Connection: Keep-Alive
|
|
|
|
# If we're in production, override any uid with the uid from the SSO request headers
|
|
if _is_production():
|
|
uid = _get_request_uid(request)
|
|
|
|
if uid:
|
|
app.logger.info("SSO_LOGIN: Full URL: " + request.url)
|
|
app.logger.info("SSO_LOGIN: User Id: " + uid)
|
|
app.logger.info("SSO_LOGIN: Will try to redirect to : " + str(redirect_url))
|
|
|
|
ldap_info = LdapService().user_info(uid)
|
|
|
|
if ldap_info:
|
|
return _handle_login(ldap_info, redirect_url)
|
|
|
|
raise ApiError('404', 'unknown')
|
|
|
|
|
|
@app.route('/sso')
|
|
def sso():
|
|
response = ""
|
|
response += "<h1>Headers</h1>"
|
|
response += "<ul>"
|
|
for k, v in request.headers:
|
|
response += "<li><b>%s</b> %s</li>\n" % (k, v)
|
|
response += "<h1>Environment</h1>"
|
|
for k, v in request.environ:
|
|
response += "<li><b>%s</b> %s</li>\n" % (k, v)
|
|
return response
|
|
|
|
|
|
def _handle_login(user_info: LdapModel, redirect_url=None):
|
|
"""
|
|
On successful login, adds user to database if the user is not already in the system,
|
|
then returns the frontend auth callback URL, with auth token appended.
|
|
|
|
Args:
|
|
user_info - an ldap user_info object.
|
|
redirect_url: Optional[str]
|
|
|
|
Returns:
|
|
Response. 302 - Redirects to the frontend auth callback URL, with auth token appended.
|
|
"""
|
|
user = _upsert_user(user_info)
|
|
g.user = user
|
|
|
|
# Return the frontend auth callback URL, with auth token appended.
|
|
auth_token = user.encode_auth_token()
|
|
g.token = auth_token
|
|
|
|
if redirect_url is not None:
|
|
if redirect_url.find("http://") != 0 and redirect_url.find("https://") != 0:
|
|
redirect_url = "http://" + redirect_url
|
|
url = '%s?token=%s' % (redirect_url, auth_token)
|
|
app.logger.info("SSO_LOGIN: REDIRECTING TO: " + url)
|
|
return flask.redirect(url, code=302)
|
|
else:
|
|
app.logger.info("SSO_LOGIN: NO REDIRECT, JUST RETURNING AUTH TOKEN.")
|
|
return auth_token
|
|
|
|
|
|
def _upsert_user(user_info):
|
|
user = session.query(UserModel).filter(UserModel.uid == user_info.uid).first()
|
|
|
|
if user is None:
|
|
# Add new user
|
|
user = UserModel()
|
|
else:
|
|
user = session.query(UserModel).filter(UserModel.uid == user_info.uid).with_for_update().first()
|
|
|
|
user.uid = user_info.uid
|
|
user.display_name = user_info.display_name
|
|
user.email_address = user_info.email_address
|
|
user.affiliation = user_info.affiliation
|
|
user.title = user_info.title
|
|
|
|
session.add(user)
|
|
session.commit()
|
|
return user
|
|
|
|
|
|
def _get_request_uid(req):
|
|
uid = None
|
|
|
|
if _is_production():
|
|
|
|
if 'user' in g and g.user is not None:
|
|
return g.user.uid
|
|
|
|
uid = req.headers.get("Uid")
|
|
if not uid:
|
|
uid = req.headers.get("X-Remote-Uid")
|
|
|
|
if not uid:
|
|
raise ApiError("invalid_sso_credentials", "'Uid' nor 'X-Remote-Uid' were present in the headers: %s"
|
|
% str(req.headers))
|
|
|
|
return uid
|
|
|
|
|
|
def _is_production():
|
|
return 'PRODUCTION' in app.config and app.config['PRODUCTION']
|