added test to ensure users can update their own task w/ burnettk

This commit is contained in:
jasquat 2022-10-21 15:19:51 -04:00
parent 6e13ee4edb
commit afdf81a031
5 changed files with 156 additions and 19 deletions

View File

@ -1,3 +1,5 @@
default_group: everybody
groups:
admin:
users: [testadmin1, testadmin2]
@ -21,8 +23,21 @@ permissions:
allowed_permissions: [read]
uri: /*
finance-admin:
tasks-crud:
groups: [everybody]
users: []
allowed_permissions: [create, read, update, delete]
uri: /v1.0/tasks/*
# TODO: all uris should really have the same structure
finance-admin-group:
groups: ["Finance Team"]
users: [testuser4]
allowed_permissions: [create, read, update, delete]
uri: /v1.0/process-groups/finance/*
finance-admin-model:
groups: ["Finance Team"]
users: [testuser4]
allowed_permissions: [create, read, update, delete]
uri: /v1.0/process-models/finance/*

View File

@ -106,6 +106,19 @@ class AuthorizationService:
db.session.commit()
cls.import_permissions_from_yaml_file()
@classmethod
def associate_user_with_group(cls, user: UserModel, group: GroupModel) -> None:
"""Associate_user_with_group."""
user_group_assignemnt = UserGroupAssignmentModel.query.filter_by(
user_id=user.id, group_id=group.id
).first()
if user_group_assignemnt is None:
user_group_assignemnt = UserGroupAssignmentModel(
user_id=user.id, group_id=group.id
)
db.session.add(user_group_assignemnt)
db.session.commit()
@classmethod
def import_permissions_from_yaml_file(
cls, raise_if_missing_user: bool = False
@ -122,6 +135,20 @@ class AuthorizationService:
with open(current_app.config["PERMISSIONS_FILE_FULLPATH"]) as file:
permission_configs = yaml.safe_load(file)
default_group = None
if "default_group" in permission_configs:
default_group_identifier = permission_configs["default_group"]
default_group = GroupModel.query.filter_by(
identifier=default_group_identifier
).first()
if default_group is None:
default_group = GroupModel(identifier=default_group_identifier)
db.session.add(default_group)
db.session.commit()
UserService.create_principal(
default_group.id, id_column_name="group_id"
)
if "groups" in permission_configs:
for group_identifier, group_config in permission_configs["groups"].items():
group = GroupModel.query.filter_by(identifier=group_identifier).first()
@ -140,15 +167,7 @@ class AuthorizationService:
)
)
continue
user_group_assignemnt = UserGroupAssignmentModel.query.filter_by(
user_id=user.id, group_id=group.id
).first()
if user_group_assignemnt is None:
user_group_assignemnt = UserGroupAssignmentModel(
user_id=user.id, group_id=group.id
)
db.session.add(user_group_assignemnt)
db.session.commit()
cls.associate_user_with_group(user, group)
if "permissions" in permission_configs:
for _permission_identifier, permission_config in permission_configs[
@ -188,6 +207,10 @@ class AuthorizationService:
principal, permission_target, allowed_permission
)
if default_group is not None:
for user in UserModel.query.all():
cls.associate_user_with_group(user, default_group)
@classmethod
def create_permission_for_principal(
cls,
@ -295,7 +318,7 @@ class AuthorizationService:
raise ApiError(
error_code="unauthorized",
message="User is not authorized to perform requested action.",
message=f"User {g.user.username} is not authorized to perform requested action: {permission_string} - {request.path}",
status_code=403,
)

View File

@ -15,6 +15,7 @@ from spiffworkflow_backend.exceptions.process_entity_not_found_error import (
ProcessEntityNotFoundError,
)
from spiffworkflow_backend.models.active_task import ActiveTaskModel
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.process_group import ProcessGroup
from spiffworkflow_backend.models.process_group import ProcessGroupSchema
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
@ -26,6 +27,7 @@ from spiffworkflow_backend.models.process_model import NotificationType
from spiffworkflow_backend.models.process_model import ProcessModelInfoSchema
from spiffworkflow_backend.models.task_event import TaskEventModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
@ -1772,6 +1774,94 @@ class TestProcessApi(BaseTest):
assert response.json is not None
assert len(response.json["results"]) == 2
def test_correct_user_can_get_and_update_a_task(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_correct_user_can_get_and_update_a_task."""
initiator_user = self.find_or_create_user("testuser4")
finance_user = self.find_or_create_user("testuser2")
assert initiator_user.principal is not None
assert finance_user.principal is not None
AuthorizationService.import_permissions_from_yaml_file()
finance_group = GroupModel.query.filter_by(identifier="Finance Team").first()
assert finance_group is not None
process_model = load_test_spec(
process_model_id="model_with_lanes",
bpmn_file_name="lanes.bpmn",
process_group_id="finance",
)
response = self.create_process_instance(
client,
process_model.process_group_id,
process_model.id,
headers=self.logged_in_headers(initiator_user),
)
assert response.status_code == 201
assert response.json is not None
process_instance_id = response.json["id"]
response = client.post(
f"/v1.0/process-models/{process_model.process_group_id}/{process_model.id}/process-instances/{process_instance_id}/run",
headers=self.logged_in_headers(initiator_user),
)
assert response.status_code == 200
response = client.get(
"/v1.0/tasks",
headers=self.logged_in_headers(finance_user),
)
assert response.status_code == 200
assert response.json is not None
assert len(response.json["results"]) == 0
response = client.get(
"/v1.0/tasks",
headers=self.logged_in_headers(initiator_user),
)
assert response.status_code == 200
assert response.json is not None
assert len(response.json["results"]) == 1
task_id = response.json["results"][0]["id"]
assert task_id is not None
response = client.put(
f"/v1.0/tasks/{process_instance_id}/{task_id}",
headers=self.logged_in_headers(finance_user),
)
assert response.status_code == 500
assert response.json
assert "UserDoesNotHaveAccessToTaskError" in response.json["message"]
response = client.put(
f"/v1.0/tasks/{process_instance_id}/{task_id}",
headers=self.logged_in_headers(initiator_user),
)
assert response.status_code == 202
response = client.get(
"/v1.0/tasks",
headers=self.logged_in_headers(initiator_user),
)
assert response.status_code == 200
assert response.json is not None
assert len(response.json["results"]) == 0
response = client.get(
"/v1.0/tasks",
headers=self.logged_in_headers(finance_user),
)
assert response.status_code == 200
assert response.json is not None
assert len(response.json["results"]) == 1
# TODO: test the auth callback endpoint
# def test_can_store_authentication_secret(
# self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None

View File

@ -43,11 +43,13 @@ class TestAuthorizationService(BaseTest):
users[username] = user
AuthorizationService.import_permissions_from_yaml_file()
assert len(users["testadmin1"].groups) == 1
assert users["testadmin1"].groups[0].identifier == "admin"
assert len(users["testuser1"].groups) == 1
assert users["testuser1"].groups[0].identifier == "Finance Team"
assert len(users["testuser2"].groups) == 2
assert len(users["testadmin1"].groups) == 2
testadmin1_group_identifiers = sorted([g.identifier for g in users["testadmin1"].groups])
assert testadmin1_group_identifiers == ["admin", "everybody"]
assert len(users["testuser1"].groups) == 2
testuser1_group_identifiers = sorted([g.identifier for g in users["testuser1"].groups])
assert testuser1_group_identifiers == ["Finance Team", "everybody"]
assert len(users["testuser2"].groups) == 3
self.assert_user_has_permission(
users["testuser1"], "update", "/v1.0/process-groups/finance/model1"
@ -61,6 +63,7 @@ class TestAuthorizationService(BaseTest):
self.assert_user_has_permission(
users["testuser4"], "update", "/v1.0/process-groups/finance/model1"
)
# via the user, not the group
self.assert_user_has_permission(
users["testuser4"], "read", "/v1.0/process-groups/finance/model1"
)

View File

@ -83,7 +83,9 @@ class TestProcessInstanceProcessor(BaseTest):
ProcessInstanceService.complete_form_task(
processor, spiff_task, {}, finance_user
)
ProcessInstanceService.complete_form_task(processor, spiff_task, {}, initiator_user)
ProcessInstanceService.complete_form_task(
processor, spiff_task, {}, initiator_user
)
assert len(process_instance.active_tasks) == 1
active_task = process_instance.active_tasks[0]
@ -99,7 +101,9 @@ class TestProcessInstanceProcessor(BaseTest):
processor, spiff_task, {}, initiator_user
)
ProcessInstanceService.complete_form_task(processor, spiff_task, {}, finance_user)
ProcessInstanceService.complete_form_task(
processor, spiff_task, {}, finance_user
)
assert len(process_instance.active_tasks) == 1
active_task = process_instance.active_tasks[0]
assert active_task.lane_assignment_id is None
@ -109,6 +113,8 @@ class TestProcessInstanceProcessor(BaseTest):
spiff_task = processor.__class__.get_task_by_bpmn_identifier(
active_task.task_name, processor.bpmn_process_instance
)
ProcessInstanceService.complete_form_task(processor, spiff_task, {}, initiator_user)
ProcessInstanceService.complete_form_task(
processor, spiff_task, {}, initiator_user
)
assert process_instance.status == ProcessInstanceStatus.complete.value