dropping the remaining config stuff for flask_sso.

updaing the user 'sso' endpoint to provide additional information for debugging.
Pulling information from ldap to stay super consistent on where we get our information.
This commit is contained in:
Dan Funk 2020-05-22 09:50:18 -04:00
parent 1cc9a60bfe
commit b490005af7
5 changed files with 69 additions and 85 deletions

View File

@ -27,34 +27,6 @@ TOKEN_AUTH_SECRET_KEY = environ.get('TOKEN_AUTH_SECRET_KEY', default="Shhhh!!! T
FRONTEND_AUTH_CALLBACK = environ.get('FRONTEND_AUTH_CALLBACK', default="http://localhost:4200/session")
SWAGGER_AUTH_KEY = environ.get('SWAGGER_AUTH_KEY', default="SWAGGER")
#: Default attribute map for single signon.
SSO_LOGIN_URL = '/login'
SSO_ATTRIBUTE_MAP = {
'Eppn': (False, 'eppn'), # dhf8r@virginia.edu
'Uid': (True, 'uid'), # dhf8r
'givenName': (False, 'first_name'), # Daniel
'Sn': (False, 'last_name'), # Funk
'affiliation': (False, 'affiliation'), # 'staff@virginia.edu;member@virginia.edu'
'displayName': (False, 'display_name'), # Daniel Harold Funk
'title': (False, 'title') # SOFTWARE ENGINEER V
}
# This what I see coming back:
# X-Remote-Cn: Daniel Harold Funk (dhf8r)
# X-Remote-Sn: Funk
# X-Remote-Givenname: Daniel
# X-Remote-Uid: dhf8r
# Eppn: dhf8r@virginia.edu
# Cn: Daniel Harold Funk (dhf8r)
# Sn: Funk
# Givenname: Daniel
# Uid: dhf8r
# X-Remote-User: dhf8r@virginia.edu
# X-Forwarded-For: 128.143.0.10
# X-Forwarded-Host: dev.crconnect.uvadcos.io
# X-Forwarded-Server: dev.crconnect.uvadcos.io
# Connection: Keep-Alive
# %s/%i placeholders expected for uva_id and study_id in various calls.
PB_BASE_URL = environ.get('PB_BASE_URL', default="http://localhost:5001/pb/")
PB_USER_STUDIES_URL = environ.get('PB_USER_STUDIES_URL', default=PB_BASE_URL + "user_studies?uva_id=%s")

View File

@ -6,7 +6,7 @@ from flask import redirect, g, request
from crc import app, db
from crc.api.common import ApiError
from crc.models.user import UserModel, UserModelSchema
from crc.services.ldap_service import LdapService
from crc.services.ldap_service import LdapService, LdapUserInfo
"""
.. module:: crc.api.user
@ -32,6 +32,7 @@ def verify_token(token):
def get_current_user():
return UserModelSchema().dump(g.user)
@app.route('/login')
def sso_login():
# This what I see coming back:
# X-Remote-Cn: Daniel Harold Funk (dhf8r)
@ -59,67 +60,48 @@ def sso_login():
redirect = request.args.get('redirect')
app.logger.info("SSO_LOGIN: Full URL: " + request.url)
app.logger.info("SSO_LOGIN: User Id: " + uid)
app.logger.info("SSO_LOGIN: Will try to redirect to : " + redirect)
app.logger.info("SSO_LOGIN: Will try to redirect to : " + str(redirect))
ldap_service = LdapService()
info = ldap_service.user_info(uid)
user = UserModel(uid=uid, email_address=info.email, display_name=info.display_name,
affiliation=info.affiliation, title=info.title)
# TODO: Get redirect URL from Shibboleth request header
_handle_login(user, redirect)
return _handle_login(info, redirect)
@app.route('/sso')
def sso():
response = ""
response += "<h1>Headers</h1>"
response += str(request.headers)
response += "<ul>"
for k,v in request.headers:
response += "<li><b>%s</b> %s</li>\n" % (k, v)
response += "<h1>Environment</h1>"
response += str(request.environ)
for k,v in request.environ:
response += "<li><b>%s</b> %s</li>\n" % (k, v)
return response
@app.route('/login')
def _handle_login(user_info, redirect_url=app.config['FRONTEND_AUTH_CALLBACK']):
def _handle_login(user_info: LdapUserInfo, redirect_url=app.config['FRONTEND_AUTH_CALLBACK']):
"""On successful login, adds user to database if the user is not already in the system,
then returns the frontend auth callback URL, with auth token appended.
Args:
user_info (dict of {
uid: str,
affiliation: Optional[str],
display_name: Optional[str],
email_address: Optional[str],
eppn: Optional[str],
first_name: Optional[str],
last_name: Optional[str],
title: Optional[str],
}): Dictionary of user attributes
redirect_url: Optional[str]
user_info - an ldap user_info object.
redirect_url: Optional[str]
Returns:
Response. 302 - Redirects to the frontend auth callback URL, with auth token appended.
"""
uid = user_info['uid']
user = db.session.query(UserModel).filter(UserModel.uid == uid).first()
user = db.session.query(UserModel).filter(UserModel.uid == user_info.uid).first()
if user is None:
# Add new user
user = UserModelSchema().load(user_info, session=db.session)
else:
# Update existing user data
user = UserModelSchema().load(user_info, session=db.session, instance=user, partial=True)
user = UserModel()
# Build display_name if not set
if 'display_name' not in user_info or len(user_info['display_name']) == 0:
display_name_list = []
for prop in ['first_name', 'last_name']:
if prop in user_info and len(user_info[prop]) > 0:
display_name_list.append(user_info[prop])
user.display_name = ' '.join(display_name_list)
user.uid = user_info.uid
user.display_name = user_info.display_name
user.email_address = user_info.email_address
user.affiliation = user_info.affiliation
user.title = user_info.title
db.session.add(user)
db.session.commit()
@ -133,6 +115,8 @@ def _handle_login(user_info, redirect_url=app.config['FRONTEND_AUTH_CALLBACK']):
app.logger.info("SSO_LOGIN: NO REDIRECT, JUST RETURNING AUTH TOKEN.")
return auth_token
def backdoor(
uid=None,
affiliation=None,
@ -165,11 +149,9 @@ def backdoor(
ApiError. If on production, returns a 404 error.
"""
if not 'PRODUCTION' in app.config or not app.config['PRODUCTION']:
user_info = {}
for key in UserModel.__dict__.keys():
if key in connexion.request.args:
user_info[key] = connexion.request.args[key]
return _handle_login(user_info, redirect_url)
ldap_info = LdapUserInfo()
ldap_info.uid = connexion.request.args["uid"]
ldap_info.email_address = connexion.request.args["email_address"]
return _handle_login(ldap_info, redirect_url)
else:
raise ApiError('404', 'unknown')

View File

@ -8,17 +8,30 @@ from crc.api.common import ApiError
class LdapUserInfo(object):
def __init__(self, entry):
self.display_name = entry.displayName.value
self.given_name = ", ".join(entry.givenName)
self.email = entry.mail.value
self.telephone_number = ", ".join(entry.telephoneNumber)
self.title = ", ".join(entry.title)
self.department = ", ".join(entry.uvaDisplayDepartment)
self.affiliation = ", ".join(entry.uvaPersonIAMAffiliation)
self.sponsor_type = ", ".join(entry.uvaPersonSponsoredType)
self.uid = entry.uid.value
def __init__(self):
self.display_name = ''
self.given_name = ''
self.email_address = ''
self.telephone_number = ''
self.title = ''
self.department = ''
self.affiliation = ''
self.sponsor_type = ''
self.uid = ''
@classmethod
def from_entry(cls, entry):
instance = cls()
instance.display_name = entry.displayName.value
instance.given_name = ", ".join(entry.givenName)
instance.email_address = entry.mail.value
instance.telephone_number = ", ".join(entry.telephoneNumber)
instance.title = ", ".join(entry.title)
instance.department = ", ".join(entry.uvaDisplayDepartment)
instance.affiliation = ", ".join(entry.uvaPersonIAMAffiliation)
instance.sponsor_type = ", ".join(entry.uvaPersonSponsoredType)
instance.uid = entry.uid.value
return instance
class LdapService(object):
search_base = "ou=People,o=University of Virginia,c=US"
@ -50,7 +63,7 @@ class LdapService(object):
if len(self.conn.entries) < 1:
raise ApiError("missing_ldap_record", "Unable to locate a user with id %s in LDAP" % uva_uid)
entry = self.conn.entries[0]
return(LdapUserInfo(entry))
return LdapUserInfo.from_entry(entry)
def search_users(self, query, limit):
search_string = LdapService.uid_search_string % query
@ -64,6 +77,6 @@ class LdapService(object):
for entry in self.conn.entries:
if count > limit:
break
results.append(LdapUserInfo(entry))
results.append(LdapUserInfo.from_entry(entry))
count += 1
return results

View File

@ -12,7 +12,7 @@ class TestAuthentication(BaseTest):
self.assertTrue(isinstance(auth_token, bytes))
self.assertEqual("dhf8r", user.decode_auth_token(auth_token).get("sub"))
def test_auth_creates_user(self):
def test_backdoor_auth_creates_user(self):
new_uid = 'czn1z';
self.load_example_data()
user = db.session.query(UserModel).filter(UserModel.uid == new_uid).first()
@ -37,6 +37,23 @@ class TestAuthentication(BaseTest):
self.assertTrue(rv_2.status_code == 302)
self.assertTrue(str.startswith(rv_2.location, redirect_url))
def test_normal_auth_creates_user(self):
new_uid = 'lb3dp' # This user is in the test ldap system.
self.load_example_data()
user = db.session.query(UserModel).filter(UserModel.uid == new_uid).first()
self.assertIsNone(user)
redirect_url = 'http://worlds.best.website/admin'
headers = dict(Uid=new_uid)
rv = self.app.get('login', follow_redirects=False, headers=headers)
self.assert_success(rv)
user = db.session.query(UserModel).filter(UserModel.uid == new_uid).first()
self.assertIsNotNone(user)
self.assertEquals(new_uid, user.uid)
self.assertEquals("Laura Barnes", user.display_name)
self.assertEquals("lb3dp@virginia.edu", user.email_address)
self.assertEquals("E0:Associate Professor of Systems and Information Engineering", user.title)
def test_current_user_status(self):
self.load_example_data()
rv = self.app.get('/v1.0/user')

View File

@ -21,7 +21,7 @@ class TestLdapService(BaseTest):
self.assertEqual("lb3dp", user_info.uid)
self.assertEqual("Laura Barnes", user_info.display_name)
self.assertEqual("Laura", user_info.given_name)
self.assertEqual("lb3dp@virginia.edu", user_info.email)
self.assertEqual("lb3dp@virginia.edu", user_info.email_address)
self.assertEqual("+1 (434) 924-1723", user_info.telephone_number)
self.assertEqual("E0:Associate Professor of Systems and Information Engineering", user_info.title)
self.assertEqual("E0:EN-Eng Sys and Environment", user_info.department)