Fixes broken unit tests. But still broken.

This commit is contained in:
Aaron Louie 2020-06-11 11:29:58 -04:00
parent 1f5554d0d8
commit cccff9b856
7 changed files with 156 additions and 95 deletions

36
Pipfile.lock generated
View File

@ -104,17 +104,17 @@
}, },
"celery": { "celery": {
"hashes": [ "hashes": [
"sha256:9ae2e73b93cc7d6b48b56aaf49a68c91752d0ffd7dfdcc47f842ca79a6f13eae", "sha256:c3f4173f83ceb5a5c986c5fdaefb9456de3b0729a72a5776e46bd405fda7b647",
"sha256:c2037b6a8463da43b19969a0fc13f9023ceca6352b4dd51be01c66fbbb13647e" "sha256:d1762d6065522879f341c3d67c2b9fe4615eb79756d59acb1434601d4aca474b"
], ],
"version": "==4.4.4" "version": "==4.4.5"
}, },
"certifi": { "certifi": {
"hashes": [ "hashes": [
"sha256:1d987a998c75633c40847cc966fcf5904906c920a7f17ef374f5aa4282abd304", "sha256:5ad7e9a056d25ffa5082862e36f119f7f7cec6457fa07ee2f8c339814b80c9b1",
"sha256:51fcb31174be6e6664c5f69e3e1691a2d72a1a12e90f872cbdb1567eb47b6519" "sha256:9cd41137dc19af6a5e03b630eefe7d1f458d964d406342dd3edf625839b944cc"
], ],
"version": "==2020.4.5.1" "version": "==2020.4.5.2"
}, },
"cffi": { "cffi": {
"hashes": [ "hashes": [
@ -285,11 +285,11 @@
}, },
"flask-marshmallow": { "flask-marshmallow": {
"hashes": [ "hashes": [
"sha256:6e6aec171b8e092e0eafaf035ff5b8637bf3a58ab46f568c4c1bab02f2a3c196", "sha256:1da1e6454a56a3e15107b987121729f152325bdef23f3df2f9b52bbd074af38e",
"sha256:a1685536e7ab5abdc712bbc1ac1a6b0b50951a368502f7985e7d1c27b3c21e59" "sha256:aefc1f1d96256c430a409f08241bab75ffe97e5d14ac5d1f000764e39bf4873a"
], ],
"index": "pypi", "index": "pypi",
"version": "==0.12.0" "version": "==0.13.0"
}, },
"flask-migrate": { "flask-migrate": {
"hashes": [ "hashes": [
@ -359,10 +359,10 @@
}, },
"inflection": { "inflection": {
"hashes": [ "hashes": [
"sha256:32a5c3341d9583ec319548b9015b7fbdf8c429cbcb575d326c33ae3a0e90d52c", "sha256:88b101b2668a1d81d6d72d4c2018e53bc6c7fc544c987849da1c7f77545c3bc9",
"sha256:9a15d3598f01220e93f2207c432cfede50daff53137ce660fb8be838ef1ca6cc" "sha256:f576e85132d34f5bf7df5183c2c6f94cfb32e528f53065345cf71329ba0b8924"
], ],
"version": "==0.4.0" "version": "==0.5.0"
}, },
"itsdangerous": { "itsdangerous": {
"hashes": [ "hashes": [
@ -751,11 +751,11 @@
}, },
"sphinx": { "sphinx": {
"hashes": [ "hashes": [
"sha256:779a519adbd3a70fc7c468af08c5e74829868b0a5b34587b33340e010291856c", "sha256:1c445320a3310baa5ccb8d957267ef4a0fc930dc1234db5098b3d7af14fbb242",
"sha256:ea64df287958ee5aac46be7ac2b7277305b0381d213728c3a49d8bb9b8415807" "sha256:7d3d5087e39ab5a031b75588e9859f011de70e213cd0080ccbc28079fb0786d1"
], ],
"index": "pypi", "index": "pypi",
"version": "==3.0.4" "version": "==3.1.0"
}, },
"sphinxcontrib-applehelp": { "sphinxcontrib-applehelp": {
"hashes": [ "hashes": [
@ -990,10 +990,10 @@
}, },
"wcwidth": { "wcwidth": {
"hashes": [ "hashes": [
"sha256:980fbf4f3c196c0f329cdcd1e84c554d6a211f18e252e525a0cf4223154a41d6", "sha256:79375666b9954d4a1a10739315816324c3e73110af9d0e102d906fdb0aec009f",
"sha256:edbc2b718b4db6cdf393eefe3a420183947d6aa312505ce6754516f458ff8830" "sha256:8c6b5b6ee1360b842645f336d9e5d68c55817c26d3050f46b235ef2bc650e48f"
], ],
"version": "==0.2.3" "version": "==0.2.4"
}, },
"zipp": { "zipp": {
"hashes": [ "hashes": [

View File

@ -29,7 +29,7 @@ SQLALCHEMY_DATABASE_URI = environ.get(
'SQLALCHEMY_DATABASE_URI', 'SQLALCHEMY_DATABASE_URI',
default="postgresql://%s:%s@%s:%s/%s" % (DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_NAME) default="postgresql://%s:%s@%s:%s/%s" % (DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_NAME)
) )
TOKEN_AUTH_TTL_HOURS = int(environ.get('TOKEN_AUTH_TTL_HOURS', default=4)) TOKEN_AUTH_TTL_HOURS = float(environ.get('TOKEN_AUTH_TTL_HOURS', default=24))
TOKEN_AUTH_SECRET_KEY = environ.get('TOKEN_AUTH_SECRET_KEY', default="Shhhh!!! This is secret! And better darn well not show up in prod.") TOKEN_AUTH_SECRET_KEY = environ.get('TOKEN_AUTH_SECRET_KEY', default="Shhhh!!! This is secret! And better darn well not show up in prod.")
FRONTEND_AUTH_CALLBACK = environ.get('FRONTEND_AUTH_CALLBACK', default="http://localhost:4200/session") FRONTEND_AUTH_CALLBACK = environ.get('FRONTEND_AUTH_CALLBACK', default="http://localhost:4200/session")
SWAGGER_AUTH_KEY = environ.get('SWAGGER_AUTH_KEY', default="SWAGGER") SWAGGER_AUTH_KEY = environ.get('SWAGGER_AUTH_KEY', default="SWAGGER")

View File

@ -115,7 +115,7 @@ paths:
delete: delete:
operationId: crc.api.study.delete_study operationId: crc.api.study.delete_study
security: security:
- jwt_admin: ['secret'] - auth_admin: ['secret']
summary: Removes the given study completely. summary: Removes the given study completely.
tags: tags:
- Studies - Studies
@ -218,7 +218,7 @@ paths:
put: put:
operationId: crc.api.workflow.update_workflow_specification operationId: crc.api.workflow.update_workflow_specification
security: security:
- jwt_admin: ['secret'] - auth_admin: ['secret']
summary: Modifies an existing workflow specification with the given parameters. summary: Modifies an existing workflow specification with the given parameters.
tags: tags:
- Workflow Specifications - Workflow Specifications
@ -237,7 +237,7 @@ paths:
delete: delete:
operationId: crc.api.workflow.delete_workflow_specification operationId: crc.api.workflow.delete_workflow_specification
security: security:
- jwt_admin: ['secret'] - auth_admin: ['secret']
summary: Removes an existing workflow specification summary: Removes an existing workflow specification
tags: tags:
- Workflow Specifications - Workflow Specifications
@ -284,7 +284,7 @@ paths:
post: post:
operationId: crc.api.workflow.add_workflow_spec_category operationId: crc.api.workflow.add_workflow_spec_category
security: security:
- jwt_admin: ['secret'] - auth_admin: ['secret']
summary: Creates a new workflow spec category with the given parameters. summary: Creates a new workflow spec category with the given parameters.
tags: tags:
- Workflow Specification Category - Workflow Specification Category
@ -323,7 +323,7 @@ paths:
put: put:
operationId: crc.api.workflow.update_workflow_spec_category operationId: crc.api.workflow.update_workflow_spec_category
security: security:
- jwt_admin: ['secret'] - auth_admin: ['secret']
summary: Modifies an existing workflow spec category with the given parameters. summary: Modifies an existing workflow spec category with the given parameters.
tags: tags:
- Workflow Specification Category - Workflow Specification Category
@ -342,7 +342,7 @@ paths:
delete: delete:
operationId: crc.api.workflow.delete_workflow_spec_category operationId: crc.api.workflow.delete_workflow_spec_category
security: security:
- jwt_admin: ['secret'] - auth_admin: ['secret']
summary: Removes an existing workflow spec category summary: Removes an existing workflow spec category
tags: tags:
- Workflow Specification Category - Workflow Specification Category
@ -543,7 +543,7 @@ paths:
put: put:
operationId: crc.api.file.set_reference_file operationId: crc.api.file.set_reference_file
security: security:
- jwt_admin: ['secret'] - auth_admin: ['secret']
summary: Update the contents of a named reference file. summary: Update the contents of a named reference file.
tags: tags:
- Files - Files
@ -603,7 +603,7 @@ paths:
delete: delete:
operationId: crc.api.workflow.delete_workflow operationId: crc.api.workflow.delete_workflow
security: security:
- jwt_admin: ['secret'] - auth_admin: ['secret']
summary: Removes an existing workflow summary: Removes an existing workflow
tags: tags:
- Workflows and Tasks - Workflows and Tasks
@ -924,7 +924,7 @@ components:
scheme: bearer scheme: bearer
bearerFormat: JWT bearerFormat: JWT
x-bearerInfoFunc: crc.api.user.verify_token x-bearerInfoFunc: crc.api.user.verify_token
jwt_admin: auth_admin:
type: http type: http
scheme: bearer scheme: bearer
bearerFormat: JWT bearerFormat: JWT

View File

@ -4,8 +4,7 @@ from flask import g, request
from crc import app, db from crc import app, db
from crc.api.common import ApiError from crc.api.common import ApiError
from crc.models.user import UserModel, UserModelSchema from crc.models.user import UserModel, UserModelSchema
from crc.services.ldap_service import LdapService, LdapModel, LdapUserInfo from crc.services.ldap_service import LdapService, LdapModel
from crc.services.approval_service import ApprovalService
""" """
.. module:: crc.api.user .. module:: crc.api.user
@ -31,7 +30,8 @@ def verify_token(token=None):
print('=== verify_token ===') print('=== verify_token ===')
print('_is_production()', _is_production()) print('_is_production()', _is_production())
failure_error = ApiError("invalid_token", "Unable to decode the token you provided. Please re-authenticate", status_code=403) failure_error = ApiError("invalid_token", "Unable to decode the token you provided. Please re-authenticate",
status_code=403)
if not _is_production(): if not _is_production():
g.user = UserModel.query.first() g.user = UserModel.query.first()
@ -62,7 +62,8 @@ def verify_token(token=None):
return token_info return token_info
else: else:
ApiError("no_user", "User not found. Please login via the frontend app before accessing this feature.", status_code=403) ApiError("no_user", "User not found. Please login via the frontend app before accessing this feature.",
status_code=403)
raise failure_error raise failure_error
@ -81,7 +82,6 @@ def verify_token_admin(token=None):
print('=== verify_token_admin ===') print('=== verify_token_admin ===')
print('_is_production()', _is_production()) print('_is_production()', _is_production())
# If this is production, check that the user is in the list of admins # If this is production, check that the user is in the list of admins
if _is_production(): if _is_production():
uid = _get_request_uid(request) uid = _get_request_uid(request)
@ -101,8 +101,8 @@ def get_current_user():
def login( def login(
uid=None, uid=None,
redirect_url=None, redirect_url=None,
): ):
""" """
In non-production environment, provides an endpoint for end-to-end system testing that allows the system In non-production environment, provides an endpoint for end-to-end system testing that allows the system
@ -175,7 +175,7 @@ def sso():
return response return response
def _handle_login(user_info: LdapUserInfo, redirect_url=None): def _handle_login(user_info: LdapModel, redirect_url=None):
""" """
On successful login, adds user to database if the user is not already in the system, On successful login, adds user to database if the user is not already in the system,
then returns the frontend auth callback URL, with auth token appended. then returns the frontend auth callback URL, with auth token appended.
@ -187,22 +187,10 @@ def _handle_login(user_info: LdapUserInfo, redirect_url=None):
Returns: Returns:
Response. 302 - Redirects to the frontend auth callback URL, with auth token appended. Response. 302 - Redirects to the frontend auth callback URL, with auth token appended.
""" """
print('=== _handle_login ===') print('=== _handle_login ===')
print('user_info', user_info) print('user_info', user_info)
user = db.session.query(UserModel).filter(UserModel.uid == user_info.uid).first() user = _upsert_user(user_info)
if user is None:
# Add new user
user = UserModel()
user.uid = user_info.uid
user.display_name = user_info.display_name
user.email_address = user_info.email_address
user.affiliation = user_info.affiliation
user.title = user_info.title
db.session.add(user)
db.session.commit()
# Return the frontend auth callback URL, with auth token appended. # Return the frontend auth callback URL, with auth token appended.
auth_token = user.encode_auth_token().decode() auth_token = user.encode_auth_token().decode()
@ -217,11 +205,35 @@ def _handle_login(user_info: LdapUserInfo, redirect_url=None):
return auth_token return auth_token
def _upsert_user(user_info):
user = db.session.query(UserModel).filter(UserModel.uid == user_info.uid).first()
if user is None:
# Add new user
user = UserModel()
else:
user = db.session.query(UserModel).filter(UserModel.uid == user_info.uid).with_for_update().first()
user.uid = user_info.uid
user.display_name = user_info.display_name
user.email_address = user_info.email_address
user.affiliation = user_info.affiliation
user.title = user_info.title
db.session.add(user)
db.session.commit()
return user
def _get_request_uid(req): def _get_request_uid(req):
uid = None uid = None
if _is_production(): if _is_production():
if 'user' in g and g.user is not None:
print('g.user.uid', g.user.uid)
return g.user.uid
print('req.headers', req.headers) print('req.headers', req.headers)
uid = req.headers.get("Uid") uid = req.headers.get("Uid")
if not uid: if not uid:

View File

@ -27,7 +27,7 @@ class UserModel(db.Model):
Generates the Auth Token Generates the Auth Token
:return: string :return: string
""" """
hours = int(app.config['TOKEN_AUTH_TTL_HOURS']) hours = float(app.config['TOKEN_AUTH_TTL_HOURS'])
payload = { payload = {
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=hours, minutes=0, seconds=0), 'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=hours, minutes=0, seconds=0),
'iat': datetime.datetime.utcnow(), 'iat': datetime.datetime.utcnow(),
@ -36,7 +36,7 @@ class UserModel(db.Model):
return jwt.encode( return jwt.encode(
payload, payload,
app.config.get('TOKEN_AUTH_SECRET_KEY'), app.config.get('TOKEN_AUTH_SECRET_KEY'),
algorithm='HS256' algorithm='HS256',
) )
@staticmethod @staticmethod
@ -50,9 +50,9 @@ class UserModel(db.Model):
payload = jwt.decode(auth_token, app.config.get('TOKEN_AUTH_SECRET_KEY'), algorithms='HS256') payload = jwt.decode(auth_token, app.config.get('TOKEN_AUTH_SECRET_KEY'), algorithms='HS256')
return payload return payload
except jwt.ExpiredSignatureError: except jwt.ExpiredSignatureError:
raise ApiError('token_expired', 'The Authentication token you provided expired, and must be renewed.') raise ApiError('token_expired', 'The Authentication token you provided expired and must be renewed.')
except jwt.InvalidTokenError: except jwt.InvalidTokenError:
raise ApiError('token_invalid', 'The Authentication token you provided. You need a new token. ') raise ApiError('token_invalid', 'The Authentication token you provided is invalid. You need a new token. ')
class UserModelSchema(SQLAlchemyAutoSchema): class UserModelSchema(SQLAlchemyAutoSchema):

View File

@ -95,7 +95,6 @@ class BaseTest(unittest.TestCase):
def tearDown(self): def tearDown(self):
ExampleDataLoader.clean_db() ExampleDataLoader.clean_db()
session.flush()
self.auths = {} self.auths = {}
def logged_in_headers(self, user=None, redirect_url='http://some/frontend/url'): def logged_in_headers(self, user=None, redirect_url='http://some/frontend/url'):

View File

@ -1,9 +1,10 @@
import json import json
from datetime import timezone, datetime from calendar import timegm
from datetime import timezone, datetime, timedelta
from tests.base_test import BaseTest from tests.base_test import BaseTest
from crc import db, app from crc import db, app
from crc.models.study import StudySchema from crc.models.study import StudySchema, StudyModel
from crc.models.user import UserModel from crc.models.user import UserModel
from crc.models.protocol_builder import ProtocolBuilderStatus from crc.models.protocol_builder import ProtocolBuilderStatus
@ -16,11 +17,29 @@ class TestAuthentication(BaseTest):
super().tearDown() super().tearDown()
def test_auth_token(self): def test_auth_token(self):
# Save the orginal timeout setting
orig_ttl = float(app.config['TOKEN_AUTH_TTL_HOURS'])
self.load_example_data() self.load_example_data()
# Set the timeout to something else
new_ttl = 4.0
app.config['TOKEN_AUTH_TTL_HOURS'] = new_ttl
user = UserModel(uid="dhf8r") user = UserModel(uid="dhf8r")
auth_token = user.encode_auth_token() expected_exp_1 = timegm((datetime.utcnow() + timedelta(hours=new_ttl)).utctimetuple())
self.assertTrue(isinstance(auth_token, bytes)) auth_token_1 = user.encode_auth_token()
self.assertEqual("dhf8r", user.decode_auth_token(auth_token).get("sub")) self.assertTrue(isinstance(auth_token_1, bytes))
self.assertEqual("dhf8r", user.decode_auth_token(auth_token_1).get("sub"))
actual_exp_1 = user.decode_auth_token(auth_token_1).get("exp")
self.assertTrue(expected_exp_1 - 1000 <= actual_exp_1 <= expected_exp_1 + 1000)
# Set the timeout back to where it was
app.config['TOKEN_AUTH_TTL_HOURS'] = orig_ttl
expected_exp_2 = timegm((datetime.utcnow() + timedelta(hours=new_ttl)).utctimetuple())
auth_token_2 = user.encode_auth_token()
self.assertTrue(isinstance(auth_token_2, bytes))
actual_exp_2 = user.decode_auth_token(auth_token_1).get("exp")
self.assertTrue(expected_exp_2 - 1000 <= actual_exp_2 <= expected_exp_2 + 1000)
def test_non_production_auth_creates_user(self): def test_non_production_auth_creates_user(self):
new_uid = 'lb3dp' ## Assure this user id is in the fake responses from ldap. new_uid = 'lb3dp' ## Assure this user id is in the fake responses from ldap.
@ -48,12 +67,14 @@ class TestAuthentication(BaseTest):
self.assertTrue(str.startswith(rv_2.location, redirect_url)) self.assertTrue(str.startswith(rv_2.location, redirect_url))
def test_production_auth_creates_user(self): def test_production_auth_creates_user(self):
# Switch production mode on # Switch production mode on
app.config['PRODUCTION'] = True app.config['PRODUCTION'] = True
new_uid = 'lb3dp' # This user is in the test ldap system.
self.load_example_data() self.load_example_data()
user = db.session.query(UserModel).filter(UserModel.uid == new_uid).first()
new_uid = 'lb3dp' # This user is in the test ldap system.
user = db.session.query(UserModel).filter_by(uid=new_uid).first()
self.assertIsNone(user) self.assertIsNone(user)
redirect_url = 'http://worlds.best.website/admin' redirect_url = 'http://worlds.best.website/admin'
headers = dict(Uid=new_uid) headers = dict(Uid=new_uid)
@ -61,7 +82,7 @@ class TestAuthentication(BaseTest):
rv = self.app.get('v1.0/login', follow_redirects=False, headers=headers) rv = self.app.get('v1.0/login', follow_redirects=False, headers=headers)
self.assert_success(rv) self.assert_success(rv)
user = db.session.query(UserModel).filter(UserModel.uid == new_uid).first() user = db.session.query(UserModel).filter_by(uid=new_uid).first()
self.assertIsNotNone(user) self.assertIsNotNone(user)
self.assertEqual(new_uid, user.uid) self.assertEqual(new_uid, user.uid)
self.assertEqual("Laura Barnes", user.display_name) self.assertEqual("Laura Barnes", user.display_name)
@ -70,6 +91,14 @@ class TestAuthentication(BaseTest):
# Switch production mode back off # Switch production mode back off
app.config['PRODUCTION'] = False app.config['PRODUCTION'] = False
db.session.flush()
db.session.flush()
db.session.flush()
db.session.flush()
db.session.flush()
db.session.flush()
db.session.flush()
db.session.flush()
def test_current_user_status(self): def test_current_user_status(self):
self.load_example_data() self.load_example_data()
@ -84,49 +113,65 @@ class TestAuthentication(BaseTest):
rv = self.app.get('/v1.0/user', headers=self.logged_in_headers(user, redirect_url='http://omg.edu/lolwut')) rv = self.app.get('/v1.0/user', headers=self.logged_in_headers(user, redirect_url='http://omg.edu/lolwut'))
self.assert_success(rv) self.assert_success(rv)
def test_admin_only_endpoints(self): def test_admin_can_access_admin_only_endpoints(self):
# Switch production mode on # Switch production mode on
app.config['PRODUCTION'] = True app.config['PRODUCTION'] = True
self.load_example_data() self.load_example_data()
admin_uids = app.config['ADMIN_UIDS'] admin_uids = app.config['ADMIN_UIDS']
self.assertGreater(len(admin_uids), 0) self.assertGreater(len(admin_uids), 0)
admin_uid = admin_uids[0]
self.assertEqual(admin_uid, 'dhf8r') # This user is in the test ldap system.
admin_headers = dict(Uid=admin_uid)
for uid in admin_uids: rv = self.app.get('v1.0/login', follow_redirects=False, headers=admin_headers)
admin_headers = dict(Uid=uid) self.assert_success(rv)
rv = self.app.get( admin_user = db.session.query(UserModel).filter(UserModel.uid == admin_uid).first()
'v1.0/login', self.assertIsNotNone(admin_user)
follow_redirects=False, self.assertEqual(admin_uid, admin_user.uid)
headers=admin_headers
)
self.assert_success(rv)
admin_user = db.session.query(UserModel).filter_by(uid=uid).first() admin_study = self._make_fake_study(admin_uid)
self.assertIsNotNone(admin_user)
admin_study = self._make_fake_study(uid) admin_token_headers = dict(Authorization='Bearer ' + admin_user.encode_auth_token().decode())
rv_add_study = self.app.post( rv_add_study = self.app.post(
'/v1.0/study', '/v1.0/study',
content_type="application/json", content_type="application/json",
headers=admin_headers, headers=admin_token_headers,
data=json.dumps(StudySchema().dump(admin_study)) data=json.dumps(StudySchema().dump(admin_study)),
) follow_redirects=False
self.assert_success(rv_add_study, 'Admin user should be able to add a study') )
self.assert_success(rv_add_study, 'Admin user should be able to add a study')
new_study = json.loads(rv.get_data(as_text=True)) new_admin_study = json.loads(rv_add_study.get_data(as_text=True))
db_admin_study = db.session.query(StudyModel).filter_by(id=new_admin_study['id']).first()
self.assertIsNotNone(db_admin_study)
rv_del_study = self.app.delete( rv_del_study = self.app.delete(
'/v1.0/study/%i' % new_study.id, '/v1.0/study/%i' % db_admin_study.id,
follow_redirects=False, follow_redirects=False,
headers=admin_headers headers=admin_token_headers
) )
self.assert_success(rv_del_study, 'Admin user should be able to delete a study') self.assert_success(rv_del_study, 'Admin user should be able to delete a study')
# Switch production mode back off
app.config['PRODUCTION'] = False
def test_nonadmin_cannot_access_admin_only_endpoints(self):
# Switch production mode on
app.config['PRODUCTION'] = True
self.load_example_data()
# Non-admin user should not be able to delete a study # Non-admin user should not be able to delete a study
non_admin_uid = 'lb3dp' non_admin_uid = 'lb3dp'
admin_uids = app.config['ADMIN_UIDS']
self.assertGreater(len(admin_uids), 0)
self.assertNotIn(non_admin_uid, admin_uids)
non_admin_headers = dict(Uid=non_admin_uid) non_admin_headers = dict(Uid=non_admin_uid)
rv = self.app.get( rv = self.app.get(
@ -138,24 +183,29 @@ class TestAuthentication(BaseTest):
non_admin_user = db.session.query(UserModel).filter_by(uid=non_admin_uid).first() non_admin_user = db.session.query(UserModel).filter_by(uid=non_admin_uid).first()
self.assertIsNotNone(non_admin_user) self.assertIsNotNone(non_admin_user)
non_admin_token_headers = dict(Authorization='Bearer ' + non_admin_user.encode_auth_token().decode())
non_admin_study = self._make_fake_study(non_admin_uid) non_admin_study = self._make_fake_study(non_admin_uid)
rv_add_study = self.app.post( rv_add_study = self.app.post(
'/v1.0/study', '/v1.0/study',
content_type="application/json", content_type="application/json",
headers=non_admin_headers, headers=non_admin_token_headers,
data=json.dumps(StudySchema().dump(non_admin_study)) data=json.dumps(StudySchema().dump(non_admin_study))
) )
self.assert_success(rv_add_study, 'Non-admin user should be able to add a study') self.assert_success(rv_add_study, 'Non-admin user should be able to add a study')
new_study = json.loads(rv.get_data(as_text=True)) new_non_admin_study = json.loads(rv_add_study.get_data(as_text=True))
db_non_admin_study = db.session.query(StudyModel).filter_by(id=new_non_admin_study['id']).first()
self.assertIsNotNone(db_non_admin_study)
rv_del_study = self.app.delete( rv_non_admin_del_study = self.app.delete(
'/v1.0/study/%i' % new_study.id, '/v1.0/study/%i' % db_non_admin_study.id,
follow_redirects=False, follow_redirects=False,
headers=non_admin_headers headers=non_admin_token_headers
) )
self.assert_failure(rv_del_study, 'Non-admin user should not be able to delete a study') self.assert_failure(rv_non_admin_del_study, 401)
# Switch production mode back off # Switch production mode back off
app.config['PRODUCTION'] = False app.config['PRODUCTION'] = False