Building out a user service for getting the current user, it will provide a number of functions, one of which will allow administrative users to impersonate other users in some circumstances (but will assure that we log events correctly when an impersonation occures)

This commit is contained in:
Dan Funk 2020-07-27 14:38:57 -04:00
parent 2979a4ef5b
commit 452f2c3723
9 changed files with 80 additions and 37 deletions

View File

@ -18,6 +18,9 @@ Make sure all of the following are properly installed on your system:
- [Install pipenv](https://pipenv-es.readthedocs.io/es/stable/)
- [Add ${HOME}/.local/bin to your PATH](https://github.com/pypa/pipenv/issues/2122#issue-319600584)
### Running Postgres
### Project Initialization
1. Clone this repository.
2. In PyCharm:

View File

@ -1,5 +1,6 @@
from SpiffWorkflow import WorkflowException
from SpiffWorkflow.exceptions import WorkflowTaskExecException
from flask import g
from crc import ma, app
@ -60,3 +61,5 @@ class ApiErrorSchema(ma.Schema):
def handle_invalid_usage(error):
response = ApiErrorSchema().dump(error)
return response, error.status_code

View File

@ -8,6 +8,7 @@ from crc.api.common import ApiError, ApiErrorSchema
from crc.models.protocol_builder import ProtocolBuilderStatus
from crc.models.study import StudySchema, StudyModel, Study
from crc.services.study_service import StudyService
from crc.services.user_service import UserService
def add_study(body):
@ -17,7 +18,7 @@ def add_study(body):
if 'title' not in body:
raise ApiError("missing_title", "Can't create a new study without a title.")
study_model = StudyModel(user_uid=g.user.uid,
study_model = StudyModel(user_uid=UserService.current_user().uid,
title=body['title'],
primary_investigator_id=body['primary_investigator_id'],
last_updated=datetime.now(),
@ -65,8 +66,9 @@ def delete_study(study_id):
def user_studies():
"""Returns all the studies associated with the current user. """
StudyService.synch_with_protocol_builder_if_enabled(g.user)
studies = StudyService.get_studies_for_user(g.user)
user = UserService.current_user(allow_admin_impersonate=True)
StudyService.synch_with_protocol_builder_if_enabled(user)
studies = StudyService.get_studies_for_user(user)
results = StudySchema(many=True).dump(studies)
return results

View File

@ -63,13 +63,15 @@ def verify_token(token=None):
# Fall back to a default user if this is not production.
g.user = UserModel.query.first()
token = g.user.encode_auth_token()
token_info = UserModel.decode_auth_token(token)
return token_info
def verify_token_admin(token=None):
"""
Verifies the token for the user (if provided) in non-production environment. If in production environment,
checks that the user is in the list of authorized admins
Verifies the token for the user (if provided) in non-production environment.
If in production environment, checks that the user is in the list of authorized admins
Args:
token: Optional[str]
@ -77,18 +79,11 @@ def verify_token_admin(token=None):
Returns:
token: str
"""
# If this is production, check that the user is in the list of admins
if _is_production():
uid = _get_request_uid(request)
if uid is not None and uid in app.config['ADMIN_UIDS']:
return verify_token()
# If we're not in production, just use the normal verify_token method
else:
return verify_token(token)
verify_token(token)
if "user" in g and g.user.is_admin():
token = g.user.encode_auth_token()
token_info = UserModel.decode_auth_token(token)
return token_info
def get_current_user():
return UserModelSchema().dump(g.user)

View File

@ -13,6 +13,7 @@ from crc.models.workflow import WorkflowModel, WorkflowSpecModelSchema, Workflow
from crc.services.file_service import FileService
from crc.services.lookup_service import LookupService
from crc.services.study_service import StudyService
from crc.services.user_service import UserService
from crc.services.workflow_processor import WorkflowProcessor
from crc.services.workflow_service import WorkflowService
@ -104,8 +105,10 @@ def get_workflow(workflow_id, soft_reset=False, hard_reset=False):
def get_task_events(action):
"""Provides a way to see a history of what has happened, or get a list of tasks that need your attention."""
query = session.query(TaskEventModel).filter(TaskEventModel.user_uid == g.user.uid)
"""Provides a way to see a history of what has happened, or get a list of
tasks that need your attention."""
user = UserService.current_user(allow_admin_impersonate=True)
query = session.query(TaskEventModel).filter(TaskEventModel.user_uid == user.uid)
if action:
query = query.filter(TaskEventModel.action == action)
events = query.all()
@ -130,7 +133,7 @@ def set_current_task(workflow_id, task_id):
task_id = uuid.UUID(task_id)
spiff_task = processor.bpmn_workflow.get_task(task_id)
_verify_user_and_role(processor, spiff_task)
user_uid = g.user.uid
user_uid = UserService.current_user(allow_admin_impersonate=True).uid
if spiff_task.state != spiff_task.COMPLETED and spiff_task.state != spiff_task.READY:
raise ApiError("invalid_state", "You may not move the token to a task who's state is not "
"currently set to COMPLETE or READY.")
@ -173,7 +176,8 @@ def update_task(workflow_id, task_id, body, terminate_loop=None):
processor.save()
# Log the action, and any pending task assignments in the event of lanes in the workflow.
WorkflowService.log_task_action(g.user.uid, processor, spiff_task, WorkflowService.TASK_ACTION_COMPLETE)
user = UserService.current_user(allow_admin_impersonate=False) # Always log as the real user.
WorkflowService.log_task_action(user.uid, processor, spiff_task, WorkflowService.TASK_ACTION_COMPLETE)
WorkflowService.update_task_assignments(processor)
workflow_api_model = WorkflowService.processor_to_workflow_api(processor)
@ -233,19 +237,11 @@ def lookup(workflow_id, field_id, query=None, value=None, limit=10):
def _verify_user_and_role(processor, spiff_task):
"""Assures the currently logged in user can access the given workflow and task, or
raises an error.
Allow administrators to modify tasks, otherwise assure that the current user
is allowed to edit or update the task. Will raise the appropriate error if user
is not authorized. """
if 'user' not in g:
raise ApiError("logged_out", "You are no longer logged in.", status_code=401)
if g.user.uid in app.config['ADMIN_UIDS']:
return g.user.uid
raises an error. """
user = UserService.current_user(allow_admin_impersonate=True)
allowed_users = WorkflowService.get_users_assigned_to_task(processor, spiff_task)
if g.user.uid not in allowed_users:
if user.uid not in allowed_users:
raise ApiError.from_task("permission_denied",
f"This task must be completed by '{allowed_users}', "
f"but you are {g.user.uid}", spiff_task)
f"but you are {user.uid}", spiff_task)

View File

@ -18,9 +18,12 @@ class UserModel(db.Model):
first_name = db.Column(db.String, nullable=True)
last_name = db.Column(db.String, nullable=True)
title = db.Column(db.String, nullable=True)
# TODO: Add Department and School
def is_admin(self):
# Currently admin abilities are set in the configuration, but this
# may change in the future.
return self.uid in app.config['ADMIN_UIDS']
def encode_auth_token(self):
"""

View File

@ -0,0 +1,37 @@
from flask import g
from crc.api.common import ApiError
class UserService(object):
"""Provides common tools for working with users"""
@staticmethod
def has_user():
if 'user' not in g or not g.user:
return False
else:
return True
@staticmethod
def current_user(allow_admin_impersonate=False):
if not UserService.has_user():
raise ApiError("logged_out", "You are no longer logged in.", status_code=401)
# Admins can pretend to be different users and act on a users behalf in
# some circumstances.
if g.user.is_admin() and allow_admin_impersonate and "impersonate_user" in g:
return g.impersonate_user
else:
return g.user
@staticmethod
def in_list(uids, allow_admin_impersonate=False):
"""Returns true if the current user's id is in the given list of ids. False if there
is no user, or the user is not in the list."""
if UserService.has_user(): # If someone is logged in, lock tasks that don't belong to them.
user = UserService.current_user(allow_admin_impersonate)
if user.uid in uids:
return True
return False

View File

@ -30,6 +30,7 @@ from crc.models.workflow import WorkflowModel, WorkflowStatus, WorkflowSpecModel
from crc.services.file_service import FileService
from crc.services.lookup_service import LookupService
from crc.services.study_service import StudyService
from crc.services.user_service import UserService
from crc.services.workflow_processor import WorkflowProcessor
@ -239,7 +240,7 @@ class WorkflowService(object):
nav_item['title'] = nav_item['task'].title # Prefer the task title.
user_uids = WorkflowService.get_users_assigned_to_task(processor, spiff_task)
if 'user' not in g or not g.user or g.user.uid not in user_uids:
if not UserService.in_list(user_uids, allow_admin_impersonate=True):
nav_item['state'] = WorkflowService.TASK_STATE_LOCKED
else:
@ -272,7 +273,7 @@ class WorkflowService(object):
workflow_api.next_task = WorkflowService.spiff_task_to_api_task(next_task, add_docs_and_forms=True)
# Update the state of the task to locked if the current user does not own the task.
user_uids = WorkflowService.get_users_assigned_to_task(processor, next_task)
if 'user' not in g or not g.user or g.user.uid not in user_uids:
if not UserService.in_list(user_uids, allow_admin_impersonate=True):
workflow_api.next_task.state = WorkflowService.TASK_STATE_LOCKED
return workflow_api

View File

@ -2,6 +2,8 @@
# IMPORTANT - Environment must be loaded before app, models, etc....
import os
from crc.services.user_service import UserService
os.environ["TESTING"] = "true"
import json
@ -118,7 +120,8 @@ class BaseTest(unittest.TestCase):
self.assertIsNotNone(user_model.display_name)
self.assertEqual(user_model.uid, uid)
self.assertTrue('user' in g, 'User should be in Flask globals')
self.assertEqual(uid, g.user.uid, 'Logged in user should match given user uid')
user = UserService.current_user(allow_admin_impersonate=True)
self.assertEqual(uid, user.uid, 'Logged in user should match given user uid')
return dict(Authorization='Bearer ' + user_model.encode_auth_token().decode())