diff --git a/src/spiffworkflow_backend/models/permission_target.py b/src/spiffworkflow_backend/models/permission_target.py index 0e576cf8..6911ecb4 100644 --- a/src/spiffworkflow_backend/models/permission_target.py +++ b/src/spiffworkflow_backend/models/permission_target.py @@ -1,4 +1,7 @@ """PermissionTarget.""" +import re +from typing import Any +from sqlalchemy.orm import validates from flask_bpmn.models.db import db from flask_bpmn.models.db import SpiffworkflowBaseDBModel @@ -9,6 +12,9 @@ from flask_bpmn.models.db import SpiffworkflowBaseDBModel # from spiffworkflow_backend.models.process_model import ProcessModel +class InvalidPermissionTargetUri(Exception): + pass + class PermissionTargetModel(SpiffworkflowBaseDBModel): """PermissionTargetModel.""" @@ -24,3 +30,11 @@ class PermissionTargetModel(SpiffworkflowBaseDBModel): # process_group_id = db.Column(ForeignKey(ProcessGroupModel.id), nullable=True) # type: ignore # process_model_identifier = db.Column(ForeignKey(ProcessModel.id), nullable=True) # type: ignore # process_instance_id = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=True) # type: ignore + + @validates("uri") + def validate_uri(self, key: str, value: str) -> str: + if re.search(r"\*.", value): + raise InvalidPermissionTargetUri( + f"Invalid Permission Target Uri: {value}" + ) + return value diff --git a/src/spiffworkflow_backend/services/authorization_service.py b/src/spiffworkflow_backend/services/authorization_service.py index 0209e3f9..a439f470 100644 --- a/src/spiffworkflow_backend/services/authorization_service.py +++ b/src/spiffworkflow_backend/services/authorization_service.py @@ -1,6 +1,9 @@ """Authorization_service.""" +import re from typing import Union +from sqlalchemy import text + import jwt from flask import current_app from flask_bpmn.api.api_error import ApiError @@ -21,13 +24,14 @@ class AuthorizationService: ) -> bool: """Has_permission.""" principal_ids = [p.id for p in principals] + permission_assignments = ( PermissionAssignmentModel.query.filter( PermissionAssignmentModel.principal_id.in_(principal_ids) ) .filter_by(permission=permission) .join(PermissionTargetModel) - .filter_by(uri=target_uri) + .filter(text(f"'{target_uri}' LIKE `permission_target`.`uri`")) .all() ) diff --git a/tests/spiffworkflow_backend/unit/test_permission_target.py b/tests/spiffworkflow_backend/unit/test_permission_target.py new file mode 100644 index 00000000..d21ac799 --- /dev/null +++ b/tests/spiffworkflow_backend/unit/test_permission_target.py @@ -0,0 +1,27 @@ +"""Process Model.""" +import pytest +from flask.app import Flask +from flask_bpmn.models.db import db +from tests.spiffworkflow_backend.helpers.base_test import BaseTest + +from spiffworkflow_backend.models.permission_target import InvalidPermissionTargetUri, PermissionTargetModel + + +class TestPermissionTarget(BaseTest): + + def test_asterisk_must_go_at_the_end_of_uri( + self, app: Flask, with_db_and_bpmn_file_cleanup: None + ) -> None: + permission_target = PermissionTargetModel(uri="/test_group/*") + db.session.add(permission_target) + db.session.commit() + + permission_target = PermissionTargetModel(uri="/test_group") + db.session.add(permission_target) + db.session.commit() + + with pytest.raises(InvalidPermissionTargetUri) as exception: + PermissionTargetModel(uri="/test_group/*/model") + assert ( + str(exception.value) == "Invalid Permission Target Uri: /test_group/*/model" + ) diff --git a/tests/spiffworkflow_backend/unit/test_permissions.py b/tests/spiffworkflow_backend/unit/test_permissions.py index b3a31989..22937f77 100644 --- a/tests/spiffworkflow_backend/unit/test_permissions.py +++ b/tests/spiffworkflow_backend/unit/test_permissions.py @@ -90,7 +90,6 @@ class TestPermissions(BaseTest): def test_user_can_be_granted_access_through_a_group( self, app: Flask, with_db_and_bpmn_file_cleanup: None ) -> None: - """Test_group_a_admin_needs_to_stay_away_from_group_b.""" process_group_ids = ["group-a", "group-b"] process_group_a_id = process_group_ids[0] process_group_ids[1] @@ -129,3 +128,42 @@ class TestPermissions(BaseTest): target_uri=f"/{process_group_a_id}", ) assert has_permission_to_a is True + + def test_user_can_be_read_models_with_global_permission( + self, app: Flask, with_db_and_bpmn_file_cleanup: None + ) -> None: + process_group_ids = ["group-a", "group-b"] + process_group_a_id = process_group_ids[0] + process_group_b_id = process_group_ids[1] + for process_group_id in process_group_ids: + load_test_spec( + "timers_intermediate_catch_event", + process_group_id=process_group_id, + ) + group_a_admin = self.find_or_create_user() + + permission_target = PermissionTargetModel(uri=f"/%") + db.session.add(permission_target) + db.session.commit() + + permission_assignment = PermissionAssignmentModel( + permission_target_id=permission_target.id, + principal_id=group_a_admin.principal.id, + permission="update", + grant_type="permit", + ) + db.session.add(permission_assignment) + db.session.commit() + + has_permission_to_a = AuthorizationService.user_has_permission( + user=group_a_admin, + permission="update", + target_uri=f"/{process_group_a_id}", + ) + assert has_permission_to_a is True + has_permission_to_b = AuthorizationService.user_has_permission( + user=group_a_admin, + permission="update", + target_uri=f"/{process_group_b_id}", + ) + assert has_permission_to_b is True