Merge pull request #115 from sartography/feature/swagger_admin_authentication

Feature/swagger admin authentication
This commit is contained in:
Aaron Louie 2020-06-11 19:20:04 -04:00 committed by GitHub
commit d7e8f22c5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 403 additions and 182 deletions

36
Pipfile.lock generated
View File

@ -104,17 +104,17 @@
},
"celery": {
"hashes": [
"sha256:9ae2e73b93cc7d6b48b56aaf49a68c91752d0ffd7dfdcc47f842ca79a6f13eae",
"sha256:c2037b6a8463da43b19969a0fc13f9023ceca6352b4dd51be01c66fbbb13647e"
"sha256:c3f4173f83ceb5a5c986c5fdaefb9456de3b0729a72a5776e46bd405fda7b647",
"sha256:d1762d6065522879f341c3d67c2b9fe4615eb79756d59acb1434601d4aca474b"
],
"version": "==4.4.4"
"version": "==4.4.5"
},
"certifi": {
"hashes": [
"sha256:1d987a998c75633c40847cc966fcf5904906c920a7f17ef374f5aa4282abd304",
"sha256:51fcb31174be6e6664c5f69e3e1691a2d72a1a12e90f872cbdb1567eb47b6519"
"sha256:5ad7e9a056d25ffa5082862e36f119f7f7cec6457fa07ee2f8c339814b80c9b1",
"sha256:9cd41137dc19af6a5e03b630eefe7d1f458d964d406342dd3edf625839b944cc"
],
"version": "==2020.4.5.1"
"version": "==2020.4.5.2"
},
"cffi": {
"hashes": [
@ -285,11 +285,11 @@
},
"flask-marshmallow": {
"hashes": [
"sha256:6e6aec171b8e092e0eafaf035ff5b8637bf3a58ab46f568c4c1bab02f2a3c196",
"sha256:a1685536e7ab5abdc712bbc1ac1a6b0b50951a368502f7985e7d1c27b3c21e59"
"sha256:1da1e6454a56a3e15107b987121729f152325bdef23f3df2f9b52bbd074af38e",
"sha256:aefc1f1d96256c430a409f08241bab75ffe97e5d14ac5d1f000764e39bf4873a"
],
"index": "pypi",
"version": "==0.12.0"
"version": "==0.13.0"
},
"flask-migrate": {
"hashes": [
@ -359,10 +359,10 @@
},
"inflection": {
"hashes": [
"sha256:32a5c3341d9583ec319548b9015b7fbdf8c429cbcb575d326c33ae3a0e90d52c",
"sha256:9a15d3598f01220e93f2207c432cfede50daff53137ce660fb8be838ef1ca6cc"
"sha256:88b101b2668a1d81d6d72d4c2018e53bc6c7fc544c987849da1c7f77545c3bc9",
"sha256:f576e85132d34f5bf7df5183c2c6f94cfb32e528f53065345cf71329ba0b8924"
],
"version": "==0.4.0"
"version": "==0.5.0"
},
"itsdangerous": {
"hashes": [
@ -751,11 +751,11 @@
},
"sphinx": {
"hashes": [
"sha256:779a519adbd3a70fc7c468af08c5e74829868b0a5b34587b33340e010291856c",
"sha256:ea64df287958ee5aac46be7ac2b7277305b0381d213728c3a49d8bb9b8415807"
"sha256:1c445320a3310baa5ccb8d957267ef4a0fc930dc1234db5098b3d7af14fbb242",
"sha256:7d3d5087e39ab5a031b75588e9859f011de70e213cd0080ccbc28079fb0786d1"
],
"index": "pypi",
"version": "==3.0.4"
"version": "==3.1.0"
},
"sphinxcontrib-applehelp": {
"hashes": [
@ -990,10 +990,10 @@
},
"wcwidth": {
"hashes": [
"sha256:980fbf4f3c196c0f329cdcd1e84c554d6a211f18e252e525a0cf4223154a41d6",
"sha256:edbc2b718b4db6cdf393eefe3a420183947d6aa312505ce6754516f458ff8830"
"sha256:79375666b9954d4a1a10739315816324c3e73110af9d0e102d906fdb0aec009f",
"sha256:8c6b5b6ee1360b842645f336d9e5d68c55817c26d3050f46b235ef2bc650e48f"
],
"version": "==0.2.3"
"version": "==0.2.4"
},
"zipp": {
"hashes": [

View File

@ -9,9 +9,10 @@ JSON_SORT_KEYS = False # CRITICAL. Do not sort the data when returning values
NAME = "CR Connect Workflow"
FLASK_PORT = environ.get('PORT0') or environ.get('FLASK_PORT', default="5000")
CORS_ALLOW_ORIGINS = re.split(r',\s*', environ.get('CORS_ALLOW_ORIGINS', default="localhost:4200, localhost:5002"))
DEVELOPMENT = environ.get('DEVELOPMENT', default="true") == "true"
TESTING = environ.get('TESTING', default="false") == "true"
PRODUCTION = (environ.get('PRODUCTION', default="false") == "true") or (not DEVELOPMENT and not TESTING)
PRODUCTION = (environ.get('PRODUCTION', default="false") == "true")
TEST_UID = environ.get('TEST_UID', default="dhf8r")
ADMIN_UIDS = re.split(r',\s*', environ.get('ADMIN_UIDS', default="dhf8r,ajl2j,cah13us,cl3wf"))
# Sentry flag
ENABLE_SENTRY = environ.get('ENABLE_SENTRY', default="false") == "true"
@ -28,7 +29,7 @@ SQLALCHEMY_DATABASE_URI = environ.get(
'SQLALCHEMY_DATABASE_URI',
default="postgresql://%s:%s@%s:%s/%s" % (DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_NAME)
)
TOKEN_AUTH_TTL_HOURS = int(environ.get('TOKEN_AUTH_TTL_HOURS', default=4))
TOKEN_AUTH_TTL_HOURS = float(environ.get('TOKEN_AUTH_TTL_HOURS', default=24))
TOKEN_AUTH_SECRET_KEY = environ.get('TOKEN_AUTH_SECRET_KEY', default="Shhhh!!! This is secret! And better darn well not show up in prod.")
FRONTEND_AUTH_CALLBACK = environ.get('FRONTEND_AUTH_CALLBACK', default="http://localhost:4200/session")
SWAGGER_AUTH_KEY = environ.get('SWAGGER_AUTH_KEY', default="SWAGGER")

View File

@ -4,7 +4,6 @@ from os import environ
basedir = os.path.abspath(os.path.dirname(__file__))
NAME = "CR Connect Workflow"
DEVELOPMENT = True
TESTING = True
TOKEN_AUTH_SECRET_KEY = "Shhhh!!! This is secret! And better darn well not show up in prod."
PB_ENABLED = False
@ -23,8 +22,8 @@ SQLALCHEMY_DATABASE_URI = environ.get(
'SQLALCHEMY_DATABASE_URI',
default="postgresql://%s:%s@%s:%s/%s" % (DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_NAME)
)
ADMIN_UIDS = ['dhf8r']
print('### USING TESTING CONFIG: ###')
print('SQLALCHEMY_DATABASE_URI = ', SQLALCHEMY_DATABASE_URI)
print('DEVELOPMENT = ', DEVELOPMENT)
print('TESTING = ', TESTING)

View File

@ -2,7 +2,6 @@ import os
basedir = os.path.abspath(os.path.dirname(__file__))
NAME = "CR Connect Workflow"
DEVELOPMENT = True
TESTING = True
SQLALCHEMY_DATABASE_URI = "postgresql://postgres:@localhost:5432/crc_test"
TOKEN_AUTH_TTL_HOURS = 2
@ -12,6 +11,5 @@ PB_ENABLED = False
print('+++ USING TRAVIS TESTING CONFIG: +++')
print('SQLALCHEMY_DATABASE_URI = ', SQLALCHEMY_DATABASE_URI)
print('DEVELOPMENT = ', DEVELOPMENT)
print('TESTING = ', TESTING)
print('FRONTEND_AUTH_CALLBACK = ', FRONTEND_AUTH_CALLBACK)

View File

@ -57,15 +57,16 @@ env = Environment(loader=FileSystemLoader(template_dir))
mail = Mail(app)
print('=== USING THESE CONFIG SETTINGS: ===')
print('DB_HOST = ', )
print('CORS_ALLOW_ORIGINS = ', app.config['CORS_ALLOW_ORIGINS'])
print('DEVELOPMENT = ', app.config['DEVELOPMENT'])
print('TESTING = ', app.config['TESTING'])
print('PRODUCTION = ', app.config['PRODUCTION'])
print('PB_BASE_URL = ', app.config['PB_BASE_URL'])
print('LDAP_URL = ', app.config['LDAP_URL'])
print('APPLICATION_ROOT = ', app.config['APPLICATION_ROOT'])
print('CORS_ALLOW_ORIGINS = ', app.config['CORS_ALLOW_ORIGINS'])
print('DB_HOST = ', app.config['DB_HOST'])
print('LDAP_URL = ', app.config['LDAP_URL'])
print('PB_BASE_URL = ', app.config['PB_BASE_URL'])
print('PB_ENABLED = ', app.config['PB_ENABLED'])
print('PRODUCTION = ', app.config['PRODUCTION'])
print('TESTING = ', app.config['TESTING'])
print('TEST_UID = ', app.config['TEST_UID'])
print('ADMIN_UIDS = ', app.config['ADMIN_UIDS'])
@app.cli.command()
def load_example_data():

View File

@ -9,54 +9,18 @@ servers:
security:
- jwt: ['secret']
paths:
/sso_backdoor:
/login:
get:
operationId: crc.api.user.backdoor
summary: A backdoor that allows someone to log in as a specific user, if they
are in a staging environment.
operationId: crc.api.user.login
summary: In production, logs the user in via SSO. If not in production, logs in as a specific user for testing.
security: [] # Disable security for this endpoint only.
parameters:
- name: uid
in: query
required: true
schema:
type: string
- name: email_address
in: query
required: false
schema:
type: string
- name: display_name
in: query
required: false
schema:
type: string
- name: affiliation
in: query
required: false
schema:
type: string
- name: eppn
in: query
required: false
schema:
type: string
- name: first_name
in: query
required: false
schema:
type: string
- name: last_name
in: query
required: false
schema:
type: string
- name: title
in: query
required: false
schema:
type: string
- name: redirect
- name: redirect_url
in: query
required: false
schema:
@ -150,6 +114,8 @@ paths:
$ref: "#/components/schemas/Study"
delete:
operationId: crc.api.study.delete_study
security:
- auth_admin: ['secret']
summary: Removes the given study completely.
tags:
- Studies
@ -251,6 +217,8 @@ paths:
$ref: "#/components/schemas/WorkflowSpec"
put:
operationId: crc.api.workflow.update_workflow_specification
security:
- auth_admin: ['secret']
summary: Modifies an existing workflow specification with the given parameters.
tags:
- Workflow Specifications
@ -268,6 +236,8 @@ paths:
$ref: "#/components/schemas/WorkflowSpec"
delete:
operationId: crc.api.workflow.delete_workflow_specification
security:
- auth_admin: ['secret']
summary: Removes an existing workflow specification
tags:
- Workflow Specifications
@ -313,6 +283,8 @@ paths:
$ref: "#/components/schemas/WorkflowSpecCategory"
post:
operationId: crc.api.workflow.add_workflow_spec_category
security:
- auth_admin: ['secret']
summary: Creates a new workflow spec category with the given parameters.
tags:
- Workflow Specification Category
@ -350,6 +322,8 @@ paths:
$ref: "#/components/schemas/WorkflowSpecCategory"
put:
operationId: crc.api.workflow.update_workflow_spec_category
security:
- auth_admin: ['secret']
summary: Modifies an existing workflow spec category with the given parameters.
tags:
- Workflow Specification Category
@ -367,6 +341,8 @@ paths:
$ref: "#/components/schemas/WorkflowSpecCategory"
delete:
operationId: crc.api.workflow.delete_workflow_spec_category
security:
- auth_admin: ['secret']
summary: Removes an existing workflow spec category
tags:
- Workflow Specification Category
@ -566,6 +542,8 @@ paths:
example: '<?xml version="1.0" encoding="UTF-8"?><bpmn:definitions></bpmn:definitions>'
put:
operationId: crc.api.file.set_reference_file
security:
- auth_admin: ['secret']
summary: Update the contents of a named reference file.
tags:
- Files
@ -624,6 +602,8 @@ paths:
$ref: "#/components/schemas/Workflow"
delete:
operationId: crc.api.workflow.delete_workflow
security:
- auth_admin: ['secret']
summary: Removes an existing workflow
tags:
- Workflows and Tasks
@ -944,6 +924,11 @@ components:
scheme: bearer
bearerFormat: JWT
x-bearerInfoFunc: crc.api.user.verify_token
auth_admin:
type: http
scheme: bearer
bearerFormat: JWT
x-bearerInfoFunc: crc.api.user.verify_token_admin
schemas:
User:
properties:

View File

@ -10,12 +10,32 @@ from crc.services.ldap_service import LdapService, LdapModel
.. module:: crc.api.user
:synopsis: Single Sign On (SSO) user login and session handlers
"""
def verify_token(token):
failure_error = ApiError("invalid_token", "Unable to decode the token you provided. Please re-authenticate", status_code=403)
if (not 'PRODUCTION' in app.config or not app.config['PRODUCTION']) and token == app.config["SWAGGER_AUTH_KEY"]:
def verify_token(token=None):
"""
Verifies the token for the user (if provided). If in production environment and token is not provided,
gets user from the SSO headers and returns their token.
Args:
token: Optional[str]
Returns:
token: str
Raises:
ApiError. If not on production and token is not valid, returns an 'invalid_token' 403 error.
If on production and user is not authenticated, returns a 'no_user' 403 error.
"""
failure_error = ApiError("invalid_token", "Unable to decode the token you provided. Please re-authenticate",
status_code=403)
if not _is_production():
g.user = UserModel.query.first()
token = g.user.encode_auth_token()
if token:
try:
token_info = UserModel.decode_auth_token(token)
g.user = UserModel.query.filter_by(uid=token_info['sub']).first()
@ -26,13 +46,77 @@ def verify_token(token):
else:
raise failure_error
# If there's no token and we're in production, get the user from the SSO headers and return their token
if not token and _is_production():
uid = _get_request_uid(request)
if uid is not None:
db_user = UserModel.query.filter_by(uid=uid).first()
if db_user is not None:
g.user = db_user
token = g.user.encode_auth_token().decode()
token_info = UserModel.decode_auth_token(token)
return token_info
else:
raise ApiError("no_user", "User not found. Please login via the frontend app before accessing this feature.",
status_code=403)
def verify_token_admin(token=None):
"""
Verifies the token for the user (if provided) in non-production environment. If in production environment,
checks that the user is in the list of authorized admins
Args:
token: Optional[str]
Returns:
token: str
"""
# If this is production, check that the user is in the list of admins
if _is_production():
uid = _get_request_uid(request)
if uid is not None and uid in app.config['ADMIN_UIDS']:
return verify_token()
# If we're not in production, just use the normal verify_token method
else:
return verify_token(token)
def get_current_user():
return UserModelSchema().dump(g.user)
@app.route('/v1.0/login')
def sso_login():
# This what I see coming back:
def login(
uid=None,
redirect_url=None,
):
"""
In non-production environment, provides an endpoint for end-to-end system testing that allows the system
to simulate logging in as a specific user. In production environment, simply logs user in via single-sign-on
(SSO) Shibboleth authentication headers.
Args:
uid: Optional[str]
redirect_url: Optional[str]
Returns:
str. If not on production, returns the frontend auth callback URL, with auth token appended.
If on production and user is authenticated via SSO, returns the frontend auth callback URL,
with auth token appended.
Raises:
ApiError. If on production and user is not authenticated, returns a 404 error.
"""
# ----------------------------------------
# Shibboleth Authentication Headers
# ----------------------------------------
# X-Remote-Cn: Daniel Harold Funk (dhf8r)
# X-Remote-Sn: Funk
# X-Remote-Givenname: Daniel
@ -47,36 +131,40 @@ def sso_login():
# X-Forwarded-Host: dev.crconnect.uvadcos.io
# X-Forwarded-Server: dev.crconnect.uvadcos.io
# Connection: Keep-Alive
uid = request.headers.get("Uid")
if not uid:
uid = request.headers.get("X-Remote-Uid")
if not uid:
raise ApiError("invalid_sso_credentials", "'Uid' nor 'X-Remote-Uid' were present in the headers: %s"
% str(request.headers))
# If we're in production, override any uid with the uid from the SSO request headers
if _is_production():
uid = _get_request_uid(request)
redirect = request.args.get('redirect')
if uid:
app.logger.info("SSO_LOGIN: Full URL: " + request.url)
app.logger.info("SSO_LOGIN: User Id: " + uid)
app.logger.info("SSO_LOGIN: Will try to redirect to : " + str(redirect))
info = LdapService.user_info(uid)
return _handle_login(info, redirect)
app.logger.info("SSO_LOGIN: Will try to redirect to : " + str(redirect_url))
ldap_info = LdapService().user_info(uid)
if ldap_info:
return _handle_login(ldap_info, redirect_url)
raise ApiError('404', 'unknown')
@app.route('/sso')
def sso():
response = ""
response += "<h1>Headers</h1>"
response += "<ul>"
for k,v in request.headers:
for k, v in request.headers:
response += "<li><b>%s</b> %s</li>\n" % (k, v)
response += "<h1>Environment</h1>"
for k,v in request.environ:
for k, v in request.environ:
response += "<li><b>%s</b> %s</li>\n" % (k, v)
return response
def _handle_login(user_info: LdapModel, redirect_url=app.config['FRONTEND_AUTH_CALLBACK']):
"""On successful login, adds user to database if the user is not already in the system,
def _handle_login(user_info: LdapModel, redirect_url=None):
"""
On successful login, adds user to database if the user is not already in the system,
then returns the frontend auth callback URL, with auth token appended.
Args:
@ -86,20 +174,7 @@ def _handle_login(user_info: LdapModel, redirect_url=app.config['FRONTEND_AUTH_C
Returns:
Response. 302 - Redirects to the frontend auth callback URL, with auth token appended.
"""
user = db.session.query(UserModel).filter(UserModel.uid == user_info.uid).first()
if user is None:
# Add new user
user = UserModel()
user.uid = user_info.uid
user.display_name = user_info.display_name
user.email_address = user_info.email_address
user.affiliation = user_info.affiliation
user.title = user_info.title
db.session.add(user)
db.session.commit()
user = _upsert_user(user_info)
# Return the frontend auth callback URL, with auth token appended.
auth_token = user.encode_auth_token().decode()
@ -114,41 +189,44 @@ def _handle_login(user_info: LdapModel, redirect_url=app.config['FRONTEND_AUTH_C
return auth_token
def _upsert_user(user_info):
user = db.session.query(UserModel).filter(UserModel.uid == user_info.uid).first()
def backdoor(
uid=None,
affiliation=None,
display_name=None,
email_address=None,
eppn=None,
first_name=None,
last_name=None,
title=None,
redirect=None,
):
"""A backdoor for end-to-end system testing that allows the system to simulate logging in as a specific user.
Only works if the application is running in a non-production environment.
Args:
uid: str
affiliation: Optional[str]
display_name: Optional[str]
email_address: Optional[str]
eppn: Optional[str]
first_name: Optional[str]
last_name: Optional[str]
title: Optional[str]
redirect_url: Optional[str]
Returns:
str. If not on production, returns the frontend auth callback URL, with auth token appended.
Raises:
ApiError. If on production, returns a 404 error.
"""
if not 'PRODUCTION' in app.config or not app.config['PRODUCTION']:
ldap_info = LdapService.user_info(uid)
return _handle_login(ldap_info, redirect)
if user is None:
# Add new user
user = UserModel()
else:
raise ApiError('404', 'unknown')
user = db.session.query(UserModel).filter(UserModel.uid == user_info.uid).with_for_update().first()
user.uid = user_info.uid
user.display_name = user_info.display_name
user.email_address = user_info.email_address
user.affiliation = user_info.affiliation
user.title = user_info.title
db.session.add(user)
db.session.commit()
return user
def _get_request_uid(req):
uid = None
if _is_production():
if 'user' in g and g.user is not None:
return g.user.uid
uid = req.headers.get("Uid")
if not uid:
uid = req.headers.get("X-Remote-Uid")
if not uid:
raise ApiError("invalid_sso_credentials", "'Uid' nor 'X-Remote-Uid' were present in the headers: %s"
% str(req.headers))
return uid
def _is_production():
return 'PRODUCTION' in app.config and app.config['PRODUCTION']

View File

@ -129,7 +129,7 @@ def __get_workflow_api_model(processor: WorkflowProcessor, next_task = None):
workflow_spec_id=processor.workflow_spec_id,
spec_version=processor.get_version_string(),
is_latest_spec=processor.is_latest_spec,
total_tasks=processor.workflow_model.total_tasks,
total_tasks=len(navigation),
completed_tasks=processor.workflow_model.completed_tasks,
last_updated=processor.workflow_model.last_updated,
title=spec.display_name

View File

@ -27,7 +27,7 @@ class UserModel(db.Model):
Generates the Auth Token
:return: string
"""
hours = int(app.config['TOKEN_AUTH_TTL_HOURS'])
hours = float(app.config['TOKEN_AUTH_TTL_HOURS'])
payload = {
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=hours, minutes=0, seconds=0),
'iat': datetime.datetime.utcnow(),
@ -36,7 +36,7 @@ class UserModel(db.Model):
return jwt.encode(
payload,
app.config.get('TOKEN_AUTH_SECRET_KEY'),
algorithm='HS256'
algorithm='HS256',
)
@staticmethod
@ -50,9 +50,9 @@ class UserModel(db.Model):
payload = jwt.decode(auth_token, app.config.get('TOKEN_AUTH_SECRET_KEY'), algorithms='HS256')
return payload
except jwt.ExpiredSignatureError:
raise ApiError('token_expired', 'The Authentication token you provided expired, and must be renewed.')
raise ApiError('token_expired', 'The Authentication token you provided expired and must be renewed.')
except jwt.InvalidTokenError:
raise ApiError('token_invalid', 'The Authentication token you provided. You need a new token. ')
raise ApiError('token_invalid', 'The Authentication token you provided is invalid. You need a new token. ')
class UserModelSchema(SQLAlchemyAutoSchema):

View File

@ -2,6 +2,7 @@
# IMPORTANT - Environment must be loaded before app, models, etc....
import os
from flask import g
from sqlalchemy import Sequence
os.environ["TESTING"] = "true"
@ -95,7 +96,7 @@ class BaseTest(unittest.TestCase):
def tearDown(self):
ExampleDataLoader.clean_db()
session.flush()
g.user = None
self.auths = {}
def logged_in_headers(self, user=None, redirect_url='http://some/frontend/url'):
@ -107,7 +108,8 @@ class BaseTest(unittest.TestCase):
user_info = {'uid': user.uid}
query_string = self.user_info_to_query_string(user_info, redirect_url)
rv = self.app.get("/v1.0/sso_backdoor%s" % query_string, follow_redirects=False)
rv = self.app.get("/v1.0/login%s" % query_string, follow_redirects=False)
self.assertTrue(rv.status_code == 302)
self.assertTrue(str.startswith(rv.location, redirect_url))
@ -198,7 +200,7 @@ class BaseTest(unittest.TestCase):
for key, value in items:
query_string_list.append('%s=%s' % (key, urllib.parse.quote(value)))
query_string_list.append('redirect=%s' % redirect_url)
query_string_list.append('redirect_url=%s' % redirect_url)
return '?%s' % '&'.join(query_string_list)

View File

@ -1,19 +1,63 @@
from tests.base_test import BaseTest
import json
from calendar import timegm
from datetime import timezone, datetime, timedelta
from crc import db
import jwt
from tests.base_test import BaseTest
from crc import db, app
from crc.api.common import ApiError
from crc.models.protocol_builder import ProtocolBuilderStatus
from crc.models.study import StudySchema, StudyModel
from crc.models.user import UserModel
class TestAuthentication(BaseTest):
def test_auth_token(self):
self.load_example_data()
user = UserModel(uid="dhf8r")
auth_token = user.encode_auth_token()
self.assertTrue(isinstance(auth_token, bytes))
self.assertEqual("dhf8r", user.decode_auth_token(auth_token).get("sub"))
def tearDown(self):
# Assure we set the production flag back to false.
app.config['PRODUCTION'] = False
super().tearDown()
def test_backdoor_auth_creates_user(self):
def test_auth_token(self):
# Save the orginal timeout setting
orig_ttl = float(app.config['TOKEN_AUTH_TTL_HOURS'])
self.load_example_data()
# Set the timeout to something else
new_ttl = 4.0
app.config['TOKEN_AUTH_TTL_HOURS'] = new_ttl
user_1 = UserModel(uid="dhf8r")
expected_exp_1 = timegm((datetime.utcnow() + timedelta(hours=new_ttl)).utctimetuple())
auth_token_1 = user_1.encode_auth_token()
self.assertTrue(isinstance(auth_token_1, bytes))
self.assertEqual("dhf8r", user_1.decode_auth_token(auth_token_1).get("sub"))
actual_exp_1 = user_1.decode_auth_token(auth_token_1).get("exp")
self.assertTrue(expected_exp_1 - 1000 <= actual_exp_1 <= expected_exp_1 + 1000)
# Set the timeout to something else
neg_ttl = -0.01
app.config['TOKEN_AUTH_TTL_HOURS'] = neg_ttl
user_2 = UserModel(uid="dhf8r")
expected_exp_2 = timegm((datetime.utcnow() + timedelta(hours=neg_ttl)).utctimetuple())
auth_token_2 = user_2.encode_auth_token()
self.assertTrue(isinstance(auth_token_2, bytes))
with self.assertRaises(ApiError) as api_error:
with self.assertRaises(jwt.exceptions.ExpiredSignatureError):
user_2.decode_auth_token(auth_token_2)
self.assertEqual(api_error.exception.status_code, 400, 'Should raise an API Error if token is expired')
# Set the timeout back to where it was
app.config['TOKEN_AUTH_TTL_HOURS'] = orig_ttl
user_3 = UserModel(uid="dhf8r")
expected_exp_3 = timegm((datetime.utcnow() + timedelta(hours=new_ttl)).utctimetuple())
auth_token_3 = user_3.encode_auth_token()
self.assertTrue(isinstance(auth_token_3, bytes))
actual_exp_3 = user_3.decode_auth_token(auth_token_1).get("exp")
self.assertTrue(expected_exp_3 - 1000 <= actual_exp_3 <= expected_exp_3 + 1000)
def test_non_production_auth_creates_user(self):
new_uid = 'lb3dp' ## Assure this user id is in the fake responses from ldap.
self.load_example_data()
user = db.session.query(UserModel).filter(UserModel.uid == new_uid).first()
@ -23,7 +67,7 @@ class TestAuthentication(BaseTest):
'email_address': 'czn1z@virginia.edu'}
redirect_url = 'http://worlds.best.website/admin'
query_string = self.user_info_to_query_string(user_info, redirect_url)
url = '/v1.0/sso_backdoor%s' % query_string
url = '/v1.0/login%s' % query_string
rv_1 = self.app.get(url, follow_redirects=False)
self.assertTrue(rv_1.status_code == 302)
self.assertTrue(str.startswith(rv_1.location, redirect_url))
@ -38,22 +82,30 @@ class TestAuthentication(BaseTest):
self.assertTrue(rv_2.status_code == 302)
self.assertTrue(str.startswith(rv_2.location, redirect_url))
def test_normal_auth_creates_user(self):
new_uid = 'lb3dp' # This user is in the test ldap system.
def test_production_auth_creates_user(self):
# Switch production mode on
app.config['PRODUCTION'] = True
self.load_example_data()
user = db.session.query(UserModel).filter(UserModel.uid == new_uid).first()
new_uid = 'lb3dp' # This user is in the test ldap system.
user = db.session.query(UserModel).filter_by(uid=new_uid).first()
self.assertIsNone(user)
redirect_url = 'http://worlds.best.website/admin'
headers = dict(Uid=new_uid)
db.session.flush()
rv = self.app.get('v1.0/login', follow_redirects=False, headers=headers)
self.assert_success(rv)
user = db.session.query(UserModel).filter(UserModel.uid == new_uid).first()
user = db.session.query(UserModel).filter_by(uid=new_uid).first()
self.assertIsNotNone(user)
self.assertEqual(new_uid, user.uid)
self.assertEqual("Laura Barnes", user.display_name)
self.assertEqual("lb3dp@virginia.edu", user.email_address)
self.assertEqual("E0:Associate Professor of Systems and Information Engineering", user.title)
# Switch production mode back off
app.config['PRODUCTION'] = False
def test_current_user_status(self):
self.load_example_data()
@ -67,3 +119,108 @@ class TestAuthentication(BaseTest):
user = UserModel(uid="dhf8r", first_name='Dan', last_name='Funk', email_address='dhf8r@virginia.edu')
rv = self.app.get('/v1.0/user', headers=self.logged_in_headers(user, redirect_url='http://omg.edu/lolwut'))
self.assert_success(rv)
def test_admin_can_access_admin_only_endpoints(self):
# Switch production mode on
app.config['PRODUCTION'] = True
self.load_example_data()
admin_uids = app.config['ADMIN_UIDS']
self.assertGreater(len(admin_uids), 0)
admin_uid = admin_uids[0]
self.assertEqual(admin_uid, 'dhf8r') # This user is in the test ldap system.
admin_headers = dict(Uid=admin_uid)
rv = self.app.get('v1.0/login', follow_redirects=False, headers=admin_headers)
self.assert_success(rv)
admin_user = db.session.query(UserModel).filter(UserModel.uid == admin_uid).first()
self.assertIsNotNone(admin_user)
self.assertEqual(admin_uid, admin_user.uid)
admin_study = self._make_fake_study(admin_uid)
admin_token_headers = dict(Authorization='Bearer ' + admin_user.encode_auth_token().decode())
rv_add_study = self.app.post(
'/v1.0/study',
content_type="application/json",
headers=admin_token_headers,
data=json.dumps(StudySchema().dump(admin_study)),
follow_redirects=False
)
self.assert_success(rv_add_study, 'Admin user should be able to add a study')
new_admin_study = json.loads(rv_add_study.get_data(as_text=True))
db_admin_study = db.session.query(StudyModel).filter_by(id=new_admin_study['id']).first()
self.assertIsNotNone(db_admin_study)
rv_del_study = self.app.delete(
'/v1.0/study/%i' % db_admin_study.id,
follow_redirects=False,
headers=admin_token_headers
)
self.assert_success(rv_del_study, 'Admin user should be able to delete a study')
# Switch production mode back off
app.config['PRODUCTION'] = False
def test_nonadmin_cannot_access_admin_only_endpoints(self):
# Switch production mode on
app.config['PRODUCTION'] = True
self.load_example_data()
# Non-admin user should not be able to delete a study
non_admin_uid = 'lb3dp'
admin_uids = app.config['ADMIN_UIDS']
self.assertGreater(len(admin_uids), 0)
self.assertNotIn(non_admin_uid, admin_uids)
non_admin_headers = dict(Uid=non_admin_uid)
rv = self.app.get(
'v1.0/login',
follow_redirects=False,
headers=non_admin_headers
)
self.assert_success(rv)
non_admin_user = db.session.query(UserModel).filter_by(uid=non_admin_uid).first()
self.assertIsNotNone(non_admin_user)
non_admin_token_headers = dict(Authorization='Bearer ' + non_admin_user.encode_auth_token().decode())
non_admin_study = self._make_fake_study(non_admin_uid)
rv_add_study = self.app.post(
'/v1.0/study',
content_type="application/json",
headers=non_admin_token_headers,
data=json.dumps(StudySchema().dump(non_admin_study))
)
self.assert_success(rv_add_study, 'Non-admin user should be able to add a study')
new_non_admin_study = json.loads(rv_add_study.get_data(as_text=True))
db_non_admin_study = db.session.query(StudyModel).filter_by(id=new_non_admin_study['id']).first()
self.assertIsNotNone(db_non_admin_study)
rv_non_admin_del_study = self.app.delete(
'/v1.0/study/%i' % db_non_admin_study.id,
follow_redirects=False,
headers=non_admin_token_headers
)
self.assert_failure(rv_non_admin_del_study, 401)
# Switch production mode back off
app.config['PRODUCTION'] = False
def _make_fake_study(self, uid):
return {
"title": "blah",
"last_updated": datetime.now(tz=timezone.utc),
"protocol_builder_status": ProtocolBuilderStatus.ACTIVE,
"primary_investigator_id": uid,
"user_uid": uid,
}