2020-05-31 20:49:39 +00:00
|
|
|
import json
|
2020-06-11 15:29:58 +00:00
|
|
|
from calendar import timegm
|
|
|
|
from datetime import timezone, datetime, timedelta
|
2020-05-31 21:18:07 +00:00
|
|
|
from tests.base_test import BaseTest
|
2020-05-25 16:29:05 +00:00
|
|
|
|
2020-05-31 20:49:39 +00:00
|
|
|
from crc import db, app
|
2020-06-11 15:29:58 +00:00
|
|
|
from crc.models.study import StudySchema, StudyModel
|
2020-02-18 21:38:56 +00:00
|
|
|
from crc.models.user import UserModel
|
2020-05-31 21:18:07 +00:00
|
|
|
from crc.models.protocol_builder import ProtocolBuilderStatus
|
2020-02-18 21:38:56 +00:00
|
|
|
|
|
|
|
|
|
|
|
class TestAuthentication(BaseTest):
|
|
|
|
|
2020-05-31 21:18:07 +00:00
|
|
|
def tearDown(self):
|
|
|
|
# Assure we set the production flag back to false.
|
|
|
|
app.config['PRODUCTION'] = False
|
|
|
|
super().tearDown()
|
|
|
|
|
2020-02-18 21:38:56 +00:00
|
|
|
def test_auth_token(self):
|
2020-06-11 15:29:58 +00:00
|
|
|
# Save the orginal timeout setting
|
|
|
|
orig_ttl = float(app.config['TOKEN_AUTH_TTL_HOURS'])
|
|
|
|
|
2020-02-20 20:35:07 +00:00
|
|
|
self.load_example_data()
|
2020-06-11 15:29:58 +00:00
|
|
|
|
|
|
|
# Set the timeout to something else
|
|
|
|
new_ttl = 4.0
|
|
|
|
app.config['TOKEN_AUTH_TTL_HOURS'] = new_ttl
|
2020-02-18 21:38:56 +00:00
|
|
|
user = UserModel(uid="dhf8r")
|
2020-06-11 15:29:58 +00:00
|
|
|
expected_exp_1 = timegm((datetime.utcnow() + timedelta(hours=new_ttl)).utctimetuple())
|
|
|
|
auth_token_1 = user.encode_auth_token()
|
|
|
|
self.assertTrue(isinstance(auth_token_1, bytes))
|
|
|
|
self.assertEqual("dhf8r", user.decode_auth_token(auth_token_1).get("sub"))
|
|
|
|
actual_exp_1 = user.decode_auth_token(auth_token_1).get("exp")
|
|
|
|
self.assertTrue(expected_exp_1 - 1000 <= actual_exp_1 <= expected_exp_1 + 1000)
|
|
|
|
|
|
|
|
# Set the timeout back to where it was
|
|
|
|
app.config['TOKEN_AUTH_TTL_HOURS'] = orig_ttl
|
|
|
|
expected_exp_2 = timegm((datetime.utcnow() + timedelta(hours=new_ttl)).utctimetuple())
|
|
|
|
auth_token_2 = user.encode_auth_token()
|
|
|
|
self.assertTrue(isinstance(auth_token_2, bytes))
|
|
|
|
actual_exp_2 = user.decode_auth_token(auth_token_1).get("exp")
|
|
|
|
self.assertTrue(expected_exp_2 - 1000 <= actual_exp_2 <= expected_exp_2 + 1000)
|
2020-02-18 21:38:56 +00:00
|
|
|
|
2020-05-31 20:49:39 +00:00
|
|
|
def test_non_production_auth_creates_user(self):
|
|
|
|
new_uid = 'lb3dp' ## Assure this user id is in the fake responses from ldap.
|
2020-02-20 20:35:07 +00:00
|
|
|
self.load_example_data()
|
2020-02-27 15:30:16 +00:00
|
|
|
user = db.session.query(UserModel).filter(UserModel.uid == new_uid).first()
|
2020-02-18 21:38:56 +00:00
|
|
|
self.assertIsNone(user)
|
|
|
|
|
2020-02-27 15:30:16 +00:00
|
|
|
user_info = {'uid': new_uid, 'first_name': 'Cordi', 'last_name': 'Nator',
|
2020-05-31 20:49:39 +00:00
|
|
|
'email_address': 'czn1z@virginia.edu'}
|
2020-02-24 21:59:16 +00:00
|
|
|
redirect_url = 'http://worlds.best.website/admin'
|
|
|
|
query_string = self.user_info_to_query_string(user_info, redirect_url)
|
2020-05-31 20:49:39 +00:00
|
|
|
url = '/v1.0/login%s' % query_string
|
2020-02-21 17:03:14 +00:00
|
|
|
rv_1 = self.app.get(url, follow_redirects=False)
|
2020-02-20 22:23:10 +00:00
|
|
|
self.assertTrue(rv_1.status_code == 302)
|
2020-02-24 21:59:16 +00:00
|
|
|
self.assertTrue(str.startswith(rv_1.location, redirect_url))
|
2020-02-20 22:23:10 +00:00
|
|
|
|
2020-02-27 15:30:16 +00:00
|
|
|
user = db.session.query(UserModel).filter(UserModel.uid == new_uid).first()
|
2020-02-18 21:38:56 +00:00
|
|
|
self.assertIsNotNone(user)
|
|
|
|
self.assertIsNotNone(user.display_name)
|
|
|
|
self.assertIsNotNone(user.email_address)
|
|
|
|
|
2020-02-20 22:23:10 +00:00
|
|
|
# Hitting the same endpoint again with the same info should not cause an error
|
2020-02-21 17:03:14 +00:00
|
|
|
rv_2 = self.app.get(url, follow_redirects=False)
|
2020-02-24 21:59:16 +00:00
|
|
|
self.assertTrue(rv_2.status_code == 302)
|
|
|
|
self.assertTrue(str.startswith(rv_2.location, redirect_url))
|
2020-02-20 22:23:10 +00:00
|
|
|
|
2020-05-31 20:49:39 +00:00
|
|
|
def test_production_auth_creates_user(self):
|
2020-06-11 15:29:58 +00:00
|
|
|
|
2020-05-31 20:49:39 +00:00
|
|
|
# Switch production mode on
|
|
|
|
app.config['PRODUCTION'] = True
|
|
|
|
|
2020-05-22 13:50:18 +00:00
|
|
|
self.load_example_data()
|
2020-06-11 15:29:58 +00:00
|
|
|
|
|
|
|
new_uid = 'lb3dp' # This user is in the test ldap system.
|
|
|
|
user = db.session.query(UserModel).filter_by(uid=new_uid).first()
|
2020-05-22 13:50:18 +00:00
|
|
|
self.assertIsNone(user)
|
|
|
|
redirect_url = 'http://worlds.best.website/admin'
|
|
|
|
headers = dict(Uid=new_uid)
|
2020-06-11 15:49:07 +00:00
|
|
|
db.session.flush()
|
2020-05-23 18:49:02 +00:00
|
|
|
rv = self.app.get('v1.0/login', follow_redirects=False, headers=headers)
|
2020-05-31 20:49:39 +00:00
|
|
|
|
2020-05-22 13:50:18 +00:00
|
|
|
self.assert_success(rv)
|
2020-06-11 15:29:58 +00:00
|
|
|
user = db.session.query(UserModel).filter_by(uid=new_uid).first()
|
2020-05-22 13:50:18 +00:00
|
|
|
self.assertIsNotNone(user)
|
2020-06-05 18:08:46 +00:00
|
|
|
self.assertEqual(new_uid, user.uid)
|
|
|
|
self.assertEqual("Laura Barnes", user.display_name)
|
|
|
|
self.assertEqual("lb3dp@virginia.edu", user.email_address)
|
|
|
|
self.assertEqual("E0:Associate Professor of Systems and Information Engineering", user.title)
|
2020-05-22 13:50:18 +00:00
|
|
|
|
2020-05-31 20:49:39 +00:00
|
|
|
# Switch production mode back off
|
|
|
|
app.config['PRODUCTION'] = False
|
2020-06-11 15:49:07 +00:00
|
|
|
|
2020-05-22 13:50:18 +00:00
|
|
|
|
2020-02-18 21:38:56 +00:00
|
|
|
def test_current_user_status(self):
|
2020-02-20 20:35:07 +00:00
|
|
|
self.load_example_data()
|
2020-02-18 21:38:56 +00:00
|
|
|
rv = self.app.get('/v1.0/user')
|
|
|
|
self.assert_failure(rv, 401)
|
|
|
|
|
|
|
|
rv = self.app.get('/v1.0/user', headers=self.logged_in_headers())
|
|
|
|
self.assert_success(rv)
|
2020-02-20 20:35:07 +00:00
|
|
|
|
2020-05-25 16:29:05 +00:00
|
|
|
# User must exist in the mock ldap responses.
|
|
|
|
user = UserModel(uid="dhf8r", first_name='Dan', last_name='Funk', email_address='dhf8r@virginia.edu')
|
2020-02-24 21:59:16 +00:00
|
|
|
rv = self.app.get('/v1.0/user', headers=self.logged_in_headers(user, redirect_url='http://omg.edu/lolwut'))
|
2020-02-20 20:35:07 +00:00
|
|
|
self.assert_success(rv)
|
2020-05-31 20:49:39 +00:00
|
|
|
|
2020-06-11 15:29:58 +00:00
|
|
|
def test_admin_can_access_admin_only_endpoints(self):
|
|
|
|
|
2020-05-31 20:49:39 +00:00
|
|
|
# Switch production mode on
|
|
|
|
app.config['PRODUCTION'] = True
|
2020-06-11 15:29:58 +00:00
|
|
|
|
2020-05-31 22:01:08 +00:00
|
|
|
self.load_example_data()
|
2020-05-31 20:49:39 +00:00
|
|
|
|
|
|
|
admin_uids = app.config['ADMIN_UIDS']
|
|
|
|
self.assertGreater(len(admin_uids), 0)
|
2020-06-11 15:29:58 +00:00
|
|
|
admin_uid = admin_uids[0]
|
|
|
|
self.assertEqual(admin_uid, 'dhf8r') # This user is in the test ldap system.
|
|
|
|
admin_headers = dict(Uid=admin_uid)
|
|
|
|
|
|
|
|
rv = self.app.get('v1.0/login', follow_redirects=False, headers=admin_headers)
|
|
|
|
self.assert_success(rv)
|
2020-05-31 20:49:39 +00:00
|
|
|
|
2020-06-11 15:29:58 +00:00
|
|
|
admin_user = db.session.query(UserModel).filter(UserModel.uid == admin_uid).first()
|
|
|
|
self.assertIsNotNone(admin_user)
|
|
|
|
self.assertEqual(admin_uid, admin_user.uid)
|
2020-05-31 20:49:39 +00:00
|
|
|
|
2020-06-11 15:29:58 +00:00
|
|
|
admin_study = self._make_fake_study(admin_uid)
|
2020-05-31 20:49:39 +00:00
|
|
|
|
2020-06-11 15:29:58 +00:00
|
|
|
admin_token_headers = dict(Authorization='Bearer ' + admin_user.encode_auth_token().decode())
|
|
|
|
|
|
|
|
rv_add_study = self.app.post(
|
|
|
|
'/v1.0/study',
|
|
|
|
content_type="application/json",
|
|
|
|
headers=admin_token_headers,
|
|
|
|
data=json.dumps(StudySchema().dump(admin_study)),
|
|
|
|
follow_redirects=False
|
|
|
|
)
|
|
|
|
self.assert_success(rv_add_study, 'Admin user should be able to add a study')
|
2020-05-31 20:49:39 +00:00
|
|
|
|
2020-06-11 15:29:58 +00:00
|
|
|
new_admin_study = json.loads(rv_add_study.get_data(as_text=True))
|
|
|
|
db_admin_study = db.session.query(StudyModel).filter_by(id=new_admin_study['id']).first()
|
|
|
|
self.assertIsNotNone(db_admin_study)
|
2020-05-31 20:49:39 +00:00
|
|
|
|
2020-06-11 15:29:58 +00:00
|
|
|
rv_del_study = self.app.delete(
|
|
|
|
'/v1.0/study/%i' % db_admin_study.id,
|
|
|
|
follow_redirects=False,
|
|
|
|
headers=admin_token_headers
|
|
|
|
)
|
|
|
|
self.assert_success(rv_del_study, 'Admin user should be able to delete a study')
|
2020-05-31 20:49:39 +00:00
|
|
|
|
2020-06-11 15:29:58 +00:00
|
|
|
# Switch production mode back off
|
|
|
|
app.config['PRODUCTION'] = False
|
2020-05-31 20:49:39 +00:00
|
|
|
|
2020-06-11 15:29:58 +00:00
|
|
|
def test_nonadmin_cannot_access_admin_only_endpoints(self):
|
|
|
|
# Switch production mode on
|
|
|
|
app.config['PRODUCTION'] = True
|
2020-05-31 20:49:39 +00:00
|
|
|
|
2020-06-11 15:29:58 +00:00
|
|
|
self.load_example_data()
|
2020-05-31 20:49:39 +00:00
|
|
|
|
|
|
|
# Non-admin user should not be able to delete a study
|
|
|
|
non_admin_uid = 'lb3dp'
|
2020-06-11 15:29:58 +00:00
|
|
|
admin_uids = app.config['ADMIN_UIDS']
|
|
|
|
self.assertGreater(len(admin_uids), 0)
|
|
|
|
self.assertNotIn(non_admin_uid, admin_uids)
|
|
|
|
|
2020-05-31 20:49:39 +00:00
|
|
|
non_admin_headers = dict(Uid=non_admin_uid)
|
|
|
|
|
|
|
|
rv = self.app.get(
|
|
|
|
'v1.0/login',
|
|
|
|
follow_redirects=False,
|
|
|
|
headers=non_admin_headers
|
|
|
|
)
|
|
|
|
self.assert_success(rv)
|
|
|
|
|
|
|
|
non_admin_user = db.session.query(UserModel).filter_by(uid=non_admin_uid).first()
|
|
|
|
self.assertIsNotNone(non_admin_user)
|
2020-06-11 15:29:58 +00:00
|
|
|
|
|
|
|
non_admin_token_headers = dict(Authorization='Bearer ' + non_admin_user.encode_auth_token().decode())
|
|
|
|
|
2020-05-31 20:49:39 +00:00
|
|
|
non_admin_study = self._make_fake_study(non_admin_uid)
|
|
|
|
|
|
|
|
rv_add_study = self.app.post(
|
|
|
|
'/v1.0/study',
|
|
|
|
content_type="application/json",
|
2020-06-11 15:29:58 +00:00
|
|
|
headers=non_admin_token_headers,
|
2020-05-31 20:49:39 +00:00
|
|
|
data=json.dumps(StudySchema().dump(non_admin_study))
|
|
|
|
)
|
|
|
|
self.assert_success(rv_add_study, 'Non-admin user should be able to add a study')
|
|
|
|
|
2020-06-11 15:29:58 +00:00
|
|
|
new_non_admin_study = json.loads(rv_add_study.get_data(as_text=True))
|
|
|
|
db_non_admin_study = db.session.query(StudyModel).filter_by(id=new_non_admin_study['id']).first()
|
|
|
|
self.assertIsNotNone(db_non_admin_study)
|
2020-05-31 20:49:39 +00:00
|
|
|
|
2020-06-11 15:29:58 +00:00
|
|
|
rv_non_admin_del_study = self.app.delete(
|
|
|
|
'/v1.0/study/%i' % db_non_admin_study.id,
|
2020-05-31 20:49:39 +00:00
|
|
|
follow_redirects=False,
|
2020-06-11 15:29:58 +00:00
|
|
|
headers=non_admin_token_headers
|
2020-05-31 20:49:39 +00:00
|
|
|
)
|
2020-06-11 15:29:58 +00:00
|
|
|
self.assert_failure(rv_non_admin_del_study, 401)
|
2020-05-31 20:49:39 +00:00
|
|
|
|
|
|
|
# Switch production mode back off
|
|
|
|
app.config['PRODUCTION'] = False
|
|
|
|
|
|
|
|
|
|
|
|
def _make_fake_study(self, uid):
|
|
|
|
return {
|
|
|
|
"title": "blah",
|
|
|
|
"last_updated": datetime.now(tz=timezone.utc),
|
|
|
|
"protocol_builder_status": ProtocolBuilderStatus.ACTIVE,
|
|
|
|
"primary_investigator_id": uid,
|
|
|
|
"user_uid": uid,
|
|
|
|
}
|