From 1f6f20a734077d351385f411b6a0c4e2de73e885 Mon Sep 17 00:00:00 2001 From: jasquat Date: Thu, 22 Dec 2022 09:57:13 -0500 Subject: [PATCH] added ALL macro for easier use with admin groups and some failure test cases --- .../services/authorization_service.py | 47 +++++++++++++---- .../unit/test_authorization_service.py | 52 ++++++++++++++++++- 2 files changed, 87 insertions(+), 12 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/authorization_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/authorization_service.py index 80fbc627a..f4a3adeb8 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/authorization_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/authorization_service.py @@ -47,6 +47,10 @@ class UserDoesNotHaveAccessToTaskError(Exception): """UserDoesNotHaveAccessToTaskError.""" +class InvalidPermissionError(Exception): + pass + + @dataclass class PermissionToAssign: permission: str @@ -531,22 +535,22 @@ class AuthorizationService: return user_model # type: ignore @classmethod - def get_permissions_to_assign(cls, permission: str, process_related_path_segment: str, target_uris: list[str]) -> list[PermissionToAssign]: - permissions = [permission] - if permission == "all": + def get_permissions_to_assign(cls, permission_set: str, process_related_path_segment: str, target_uris: list[str]) -> list[PermissionToAssign]: + permissions = permission_set.split(',') + if permission_set == "all": permissions = ['create', 'read', 'update', 'delete'] permissions_to_assign: list[PermissionToAssign] = [] # we were thinking that if you can start an instance, you ought to be able to view your own instances. - if permission == "start": + if permission_set == "start": target_uri = f"/process-instances/{process_related_path_segment}" permissions_to_assign.append(PermissionToAssign(permission='create', target_uri=target_uri)) target_uri = f"/process-instances/for-me/{process_related_path_segment}" permissions_to_assign.append(PermissionToAssign(permission='read', target_uri=target_uri)) else: - if permission == 'all': + if permission_set == 'all': for path_segment in PATH_SEGMENTS_FOR_PERMISSION_ALL: target_uris.append(f"{path_segment}/{process_related_path_segment}") @@ -557,11 +561,13 @@ class AuthorizationService: return permissions_to_assign @classmethod - def explode_permissions(cls, permission: str, target: str) -> list[PermissionToAssign]: + def explode_permissions(cls, permission_set: str, target: str) -> list[PermissionToAssign]: """Explodes given permissions to and returns list of PermissionToAssign objects. These can be used to then iterate through and inserted into the database. Target Macros: + ALL + * gives access to ALL api endpoints - useful to give admin-like permissions PG:[process_group_identifier] * affects given process-group and all sub process-groups and process-models PM:[process_model_identifier] @@ -570,17 +576,24 @@ class AuthorizationService: * Basic access to complete tasks and use the site Permission Macros: - all - create, read, update, delete - start - create process-instances (aka instantiate or start a process-model) + all + * create, read, update, delete + start + * create process-instances (aka instantiate or start a process-model) + * only works with PG and PM target macros """ permissions_to_assign: list[PermissionToAssign] = [] + permissions = permission_set.split(',') + if permission_set == "all": + permissions = ['create', 'read', 'update', 'delete'] + if target.startswith("PG:"): process_group_identifier = target.removeprefix("PG:").replace(":", "/").removeprefix('/') process_related_path_segment = f"{process_group_identifier}/*" if process_group_identifier == "ALL": process_related_path_segment = "*" target_uris = [f"/process-groups/{process_related_path_segment}", f"/process-models/{process_related_path_segment}"] - permissions_to_assign = permissions_to_assign + cls.get_permissions_to_assign(permission, process_related_path_segment, target_uris) + permissions_to_assign = permissions_to_assign + cls.get_permissions_to_assign(permission_set, process_related_path_segment, target_uris) elif target.startswith("PM:"): process_model_identifier = target.removeprefix("PM:").replace(":", "/").removeprefix('/') @@ -590,7 +603,10 @@ class AuthorizationService: process_related_path_segment = "*" target_uris = [f"/process-models/{process_related_path_segment}"] - permissions_to_assign = permissions_to_assign + cls.get_permissions_to_assign(permission, process_related_path_segment, target_uris) + permissions_to_assign = permissions_to_assign + cls.get_permissions_to_assign(permission_set, process_related_path_segment, target_uris) + + elif permission_set == "start": + raise InvalidPermissionError("Permission 'start' is only available for macros PM and PG.") elif target.startswith("BASIC"): permissions_to_assign.append(PermissionToAssign(permission='read', target_uri="/process-instances/for-me")) @@ -601,8 +617,17 @@ class AuthorizationService: for permission in ['create', 'read', 'update', 'delete']: permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri="/process-instances/reports/*")) permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri="/tasks/*")) + elif target == "ALL": + for permission in permissions: + permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri='/*')) + elif target.startswith('/'): + for permission in permissions: + permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri=target)) else: - permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri=target)) + raise InvalidPermissionError( + f"Target uri '{target}' with permission set '{permission_set}' is invalid. " + f"The target uri must either be a macro of PG, PM, BASIC, or ALL or an api uri." + ) return permissions_to_assign diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_authorization_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_authorization_service.py index 035d779dd..37a366aad 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_authorization_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_authorization_service.py @@ -6,7 +6,7 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.models.user import UserNotFoundError -from spiffworkflow_backend.services.authorization_service import AuthorizationService +from spiffworkflow_backend.services.authorization_service import AuthorizationService, InvalidPermissionError from spiffworkflow_backend.services.process_instance_processor import ( ProcessInstanceProcessor, ) @@ -272,3 +272,53 @@ class TestAuthorizationService(BaseTest): permissions_to_assign = AuthorizationService.explode_permissions('all', 'BASIC') permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign]) assert permissions_to_assign_tuples == expected_permissions + + def test_explode_permissions_all( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + expected_permissions = [ + ('/*', 'create'), + ('/*', 'delete'), + ('/*', 'read'), + ('/*', 'update'), + ] + permissions_to_assign = AuthorizationService.explode_permissions('all', 'ALL') + permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign]) + assert permissions_to_assign_tuples == expected_permissions + + def test_explode_permissions_with_target_uri( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + expected_permissions = [ + ('/hey/model', 'create'), + ('/hey/model', 'delete'), + ('/hey/model', 'read'), + ('/hey/model', 'update'), + ] + permissions_to_assign = AuthorizationService.explode_permissions('all', '/hey/model') + permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign]) + assert permissions_to_assign_tuples == expected_permissions + + def test_explode_permissions_with_invalid_target_uri( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + with pytest.raises(InvalidPermissionError): + AuthorizationService.explode_permissions('all', 'BAD_MACRO') + + def test_explode_permissions_with_start_to_incorrect_target( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + with pytest.raises(InvalidPermissionError): + AuthorizationService.explode_permissions('start', '/hey/model')