Merge branch 'master' of github.com:sartography/cr-connect-workflow

This commit is contained in:
Dan Funk 2020-02-20 16:04:44 -05:00
commit d3c51af1f5
6 changed files with 185 additions and 47 deletions

View File

@ -14,11 +14,11 @@ FRONTEND_AUTH_CALLBACK = "http://localhost:4200"
SSO_ATTRIBUTE_MAP = {
'eppn': (False, 'eppn'), # dhf8r@virginia.edu
'uid': (True, 'uid'), # dhf8r
'givenName': (False, 'givenName'), # Daniel
'mail': (False, 'email'), # dhf8r@Virginia.EDU
'sn': (False, 'surName'), # Funk
'affiliation': (False, 'affiliation'), # 'staff@virginia.edu;member@virginia.edu'
'displayName': (False, 'displayName'), # Daniel Harold Funk
'givenName': (False, 'first_name'), # Daniel
'mail': (False, 'email_address'), # dhf8r@Virginia.EDU
'sn': (False, 'last_name'), # Funk
'affiliation': (False, 'affiliation'), # 'staff@virginia.edu;member@virginia.edu'
'displayName': (False, 'display_name'), # Daniel Harold Funk
'title': (False, 'title') # SOFTWARE ENGINEER V
}

View File

@ -7,19 +7,52 @@ info:
servers:
- url: http://localhost:5000/v1.0
paths:
# /v1.0/sso
/sso_backdoor/{uid}:
parameters:
- name: uid
in: path
required: true
description: The id of the user to connect as.
schema:
type: string
/sso_backdoor:
get:
operationId: crc.api.user.backdoor
summary: A backdoor that allows someone to log in as a specific user, if they
are in a staging environment.
parameters:
- name: uid
in: header
required: true
schema:
type: string
- name: email_address
in: header
required: false
schema:
type: string
- name: display_name
in: header
required: false
schema:
type: string
- name: affiliation
in: header
required: false
schema:
type: string
- name: eppn
in: header
required: false
schema:
type: string
- name: first_name
in: header
required: false
schema:
type: string
- name: last_name
in: header
required: false
schema:
type: string
- name: title
in: header
required: false
schema:
type: string
tags:
- Users
responses:
@ -602,6 +635,16 @@ components:
type: string
display_name:
type: string
affiliation:
type: string
eppn:
type: string
first_name:
type: string
last_name:
type: string
title:
type: string
DataModel:
properties:
id:

View File

@ -1,13 +1,15 @@
import connexion
from flask import redirect, g
from crc import sso, app, db, auth
from crc.api.common import ApiError
# User Accounts
# *****************************
from crc.models.user import UserModel, UserModelSchema
"""
.. module:: crc.api.user
:synopsis: Single Sign On (SSO) user login and session handlers
"""
@auth.verify_token
def verify_token(token):
@ -34,36 +36,81 @@ def sso_login(user_info):
def _handle_login(user_info):
"""On successful login, adds user to database if the user is not already in the system,
then returns the frontend auth callback URL, with auth token appended.
Args:
user_info (dict of {
uid: str,
affiliation: Optional[str],
display_name: Optional[str],
email_address: Optional[str],
eppn: Optional[str],
first_name: Optional[str],
last_name: Optional[str],
title: Optional[str],
}): Dictionary of user attributes
Returns:
str. returns the frontend auth callback URL, with auth token appended.
"""
uid = user_info['uid']
user = db.session.query(UserModel).filter(UserModel.uid == uid).first()
if user is None:
user = UserModel(uid=uid,
display_name=user_info["givenName"],
email_address=user_info["email"])
if "Surname" in user_info:
user.display_name = user.display_name + " " + user_info["Surname"]
if "displayName" in user_info and len(user_info["displayName"]) > 1:
user.display_name = user_info["displayName"]
# Update existing user data or create a new user
user = UserModelSchema().load(user_info, session=db.session)
db.session.add(user)
db.session.commit()
# redirect users back to the front end, include the new auth token.
# Build display_name if not set
if 'display_name' not in user_info or len(user_info['display_name']) == 0:
display_name_list = []
for prop in ['first_name', 'last_name']:
if prop in user_info and len(user_info[prop]) > 0:
display_name_list.append(user_info[prop])
user.display_name = ' '.join(display_name_list)
db.session.add(user)
db.session.commit()
# Return the frontend auth callback URL, with auth token appended.
auth_token = user.encode_auth_token().decode()
response_url = ("%s/%s" % (app.config["FRONTEND_AUTH_CALLBACK"], auth_token))
response_url = ('%s/%s' % (app.config['FRONTEND_AUTH_CALLBACK'], auth_token))
return redirect(response_url)
def backdoor(uid):
'''A backdoor that allows someone to log in as a specific user, if they
are in a staging environment. '''
if not "PRODUCTION" in app.config or not app.config["PRODUCTION"]:
user_info = {
"uid": uid,
"givenName": uid,
"email": uid + "@virginia.edu"
}
def backdoor():
"""A backdoor for end-to-end system testing that allows the system to simulate logging in as a specific user.
Only works if the application is running in a non-production environment.
Args:
headers (dict of {
uid: str,
affiliation: Optional[str],
display_name: Optional[str],
email_address: Optional[str],
eppn: Optional[str],
first_name: Optional[str],
last_name: Optional[str],
title: Optional[str],
}): Dictionary of user attributes
Returns:
str. If not on production, returns the frontend auth callback URL, with auth token appended.
Raises:
ApiError. If on production, returns a 404 error.
"""
if not 'PRODUCTION' in app.config or not app.config['PRODUCTION']:
# Translate uppercase HTTP_PROP_NAME to lowercase without HTTP_, if property exists in UserModel.
user_info = {}
for key, value in connexion.request.environ.items():
if key.startswith('HTTP_'):
prop = key[5:].lower()
if hasattr(UserModel, prop):
user_info[prop] = value
return _handle_login(user_info)
else:
raise ApiError("404", "unknown")
raise ApiError('404', 'unknown')

View File

@ -13,6 +13,11 @@ class UserModel(db.Model):
uid = db.Column(db.String, unique=True)
email_address = db.Column(db.String)
display_name = db.Column(db.String)
affiliation = db.Column(db.String, nullable=True)
eppn = db.Column(db.String, nullable=True)
first_name = db.Column(db.String, nullable=True)
last_name = db.Column(db.String, nullable=True)
title = db.Column(db.String, nullable=True)
def encode_auth_token(self):
"""

View File

@ -0,0 +1,36 @@
"""empty message
Revision ID: 0a6e0b829398
Revises: ad5483cb7f3b
Create Date: 2020-02-20 15:42:16.473470
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '0a6e0b829398'
down_revision = 'ad5483cb7f3b'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('user', sa.Column('affiliation', sa.String(), nullable=True))
op.add_column('user', sa.Column('eppn', sa.String(), nullable=True))
op.add_column('user', sa.Column('first_name', sa.String(), nullable=True))
op.add_column('user', sa.Column('last_name', sa.String(), nullable=True))
op.add_column('user', sa.Column('title', sa.String(), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('user', 'title')
op.drop_column('user', 'last_name')
op.drop_column('user', 'first_name')
op.drop_column('user', 'eppn')
op.drop_column('user', 'affiliation')
# ### end Alembic commands ###

View File

@ -8,31 +8,33 @@ class TestAuthentication(BaseTest):
test_uid = "dhf8r"
def logged_in_headers(self, user=None):
if user is None:
uid = self.test_uid
headers = {'uid': self.test_uid, 'givenName': 'Daniel', 'mail': 'dhf8r@virginia.edu'}
headers = {'uid': self.test_uid, 'first_name': 'Daniel', 'last_name': 'Funk', 'email_address': 'dhf8r@virginia.edu'}
else:
uid = user.uid
headers = {'uid': user.uid, 'givenName': user.display_name, 'email': user.email_address}
headers = {'uid': user.uid, 'first_name': user.first_name, 'last_name': user.last_name, 'email_address': user.email_address}
rv = self.app.get("/v1.0/sso_backdoor/" + self.test_uid, follow_redirects=True,
rv = self.app.get("/v1.0/sso_backdoor", headers=headers, follow_redirects=True,
content_type="application/json")
user_model = UserModel.query.filter_by(uid=uid).first()
self.assertIsNotNone(user_model.display_name)
return dict(Authorization='Bearer ' + user_model.encode_auth_token().decode())
def test_auth_token(self):
self.load_example_data()
user = UserModel(uid="dhf8r")
auth_token = user.encode_auth_token()
self.assertTrue(isinstance(auth_token, bytes))
self.assertEqual("dhf8r", user.decode_auth_token(auth_token))
def test_auth_creates_user(self):
self.load_example_data()
user = db.session.query(UserModel).filter(UserModel.uid == self.test_uid).first()
self.assertIsNone(user)
headers = {'uid': self.test_uid, 'givenName': 'Daniel', 'mail': 'dhf8r@virginia.edu'}
rv = self.app.get("/v1.0/sso_backdoor/" + self.test_uid, headers=headers, follow_redirects=True,
headers = {'uid': self.test_uid, 'first_name': 'Daniel', 'email_address': 'dhf8r@virginia.edu'}
rv = self.app.get("/v1.0/sso_backdoor", headers=headers, follow_redirects=True,
content_type="application/json")
user = db.session.query(UserModel).filter(UserModel.uid == self.test_uid).first()
self.assertIsNotNone(user)
@ -40,8 +42,13 @@ class TestAuthentication(BaseTest):
self.assertIsNotNone(user.email_address)
def test_current_user_status(self):
self.load_example_data()
rv = self.app.get('/v1.0/user')
self.assert_failure(rv, 401)
rv = self.app.get('/v1.0/user', headers=self.logged_in_headers())
self.assert_success(rv)
user = UserModel(uid="ajl2j", first_name='Aaron', last_name='Louie', email_address='ajl2j@virginia.edu')
rv = self.app.get('/v1.0/user', headers=self.logged_in_headers(user))
self.assert_success(rv)