diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/scripts/get_all_permissions.py b/spiffworkflow-backend/src/spiffworkflow_backend/scripts/get_all_permissions.py index 83a7e582..7cdcf360 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/scripts/get_all_permissions.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/scripts/get_all_permissions.py @@ -12,9 +12,6 @@ from spiffworkflow_backend.models.script_attributes_context import ( from spiffworkflow_backend.scripts.script import Script -# add_permission("read", "test/*", "Editors") - - class GetAllPermissions(Script): """GetAllPermissions.""" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/scripts/refresh_permissions.py b/spiffworkflow-backend/src/spiffworkflow_backend/scripts/refresh_permissions.py index b7e46dd6..8c97fe60 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/scripts/refresh_permissions.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/scripts/refresh_permissions.py @@ -7,10 +7,9 @@ from spiffworkflow_backend.models.script_attributes_context import ( from spiffworkflow_backend.scripts.script import Script from spiffworkflow_backend.services.authorization_service import AuthorizationService -# add_permission("read", "test/*", "Editors") - class RecreatePermissions(Script): + """RecreatePermissions.""" def get_description(self) -> str: """Get_description.""" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/authorization_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/authorization_service.py index 6ab240d0..cd125ee5 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/authorization_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/authorization_service.py @@ -1,13 +1,14 @@ """Authorization_service.""" import inspect -from typing import TypedDict -from typing import Any, Set import re from dataclasses import dataclass from hashlib import sha256 from hmac import compare_digest from hmac import HMAC +from typing import Any from typing import Optional +from typing import Set +from typing import TypedDict from typing import Union import jwt @@ -23,7 +24,6 @@ from sqlalchemy import or_ from sqlalchemy import text from spiffworkflow_backend.helpers.api_version import V1_API_PATH_PREFIX -from spiffworkflow_backend.models import permission_assignment from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.human_task import HumanTaskModel from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel @@ -72,6 +72,8 @@ PATH_SEGMENTS_FOR_PERMISSION_ALL = [ class DesiredPermissionDict(TypedDict): + """DesiredPermissionDict.""" + group_identifiers: Set[str] permission_assignments: list[PermissionAssignmentModel] @@ -236,9 +238,13 @@ class AuthorizationService: for group_identifier in permission_config["groups"]: group = GroupService.find_or_create_group(group_identifier) unique_user_group_identifiers.add(group_identifier) - permission_assignments.append(cls.create_permission_for_principal( - group.principal, permission_target, allowed_permission - )) + permission_assignments.append( + cls.create_permission_for_principal( + group.principal, + permission_target, + allowed_permission, + ) + ) if "users" in permission_config: for username in permission_config["users"]: user = UserModel.query.filter_by(username=username).first() @@ -248,15 +254,20 @@ class AuthorizationService: .filter(UserModel.username == username) .first() ) - permission_assignments.append(cls.create_permission_for_principal( - principal, permission_target, allowed_permission - )) + permission_assignments.append( + cls.create_permission_for_principal( + principal, permission_target, allowed_permission + ) + ) if default_group is not None: for user in UserModel.query.all(): cls.associate_user_with_group(user, default_group) - return { 'group_identifiers': unique_user_group_identifiers, 'permission_assignments': permission_assignments } + return { + "group_identifiers": unique_user_group_identifiers, + "permission_assignments": permission_assignments, + } @classmethod def find_or_create_permission_target(cls, uri: str) -> PermissionTargetModel: @@ -715,9 +726,11 @@ class AuthorizationService: permission_target = cls.find_or_create_permission_target( permission_to_assign.target_uri ) - permission_assignments.append(cls.create_permission_for_principal( - group.principal, permission_target, permission_to_assign.permission - )) + permission_assignments.append( + cls.create_permission_for_principal( + group.principal, permission_target, permission_to_assign.permission + ) + ) return permission_assignments @classmethod @@ -725,24 +738,32 @@ class AuthorizationService: """Adds new permission assignments and deletes old ones.""" initial_permission_assignments = PermissionAssignmentModel.query.all() result = cls.import_permissions_from_yaml_file() - desired_permission_assignments = result['permission_assignments'] - desired_group_identifiers = result['group_identifiers'] + desired_permission_assignments = result["permission_assignments"] + desired_group_identifiers = result["group_identifiers"] for group in group_info: - for username in group['users']: - GroupService.add_user_to_group_or_add_to_waiting(username, group['name']) - for permission in group['permissions']: - for crud_op in permission['actions']: - desired_permission_assignments.extend(cls.add_permission_from_uri_or_macro( - group_identifier=group['name'], target=permission['uri'], permission=crud_op - )) - desired_group_identifiers.add(group['name']) + for username in group["users"]: + GroupService.add_user_to_group_or_add_to_waiting( + username, group["name"] + ) + for permission in group["permissions"]: + for crud_op in permission["actions"]: + desired_permission_assignments.extend( + cls.add_permission_from_uri_or_macro( + group_identifier=group["name"], + target=permission["uri"], + permission=crud_op, + ) + ) + desired_group_identifiers.add(group["name"]) for ipa in initial_permission_assignments: if ipa not in desired_permission_assignments: db.session.delete(ipa) - groups_to_delete = GroupModel.query.filter(GroupModel.identifier.not_in(desired_group_identifiers)).all() + groups_to_delete = GroupModel.query.filter( + GroupModel.identifier.not_in(desired_group_identifiers) + ).all() for gtd in groups_to_delete: db.session.delete(gtd) db.session.commit() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/group_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/group_service.py index 85f6441f..911d41ac 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/group_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/group_service.py @@ -1,10 +1,10 @@ """Group_service.""" from typing import Optional -from spiffworkflow_backend.models.user import UserModel from flask_bpmn.models.db import db from spiffworkflow_backend.models.group import GroupModel +from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.services.user_service import UserService @@ -25,7 +25,10 @@ class GroupService: return group @classmethod - def add_user_to_group_or_add_to_waiting(cls, username: str, group_identifier: str) -> None: + def add_user_to_group_or_add_to_waiting( + cls, username: str, group_identifier: str + ) -> None: + """Add_user_to_group_or_add_to_waiting.""" group = cls.find_or_create_group(group_identifier) user = UserModel.query.filter_by(username=username).first() if user: diff --git a/spiffworkflow-backend/tests/data/script_add_permission/add_permission.bpmn b/spiffworkflow-backend/tests/data/script_add_permission/add_permission.bpmn deleted file mode 100644 index 50a56b1f..00000000 --- a/spiffworkflow-backend/tests/data/script_add_permission/add_permission.bpmn +++ /dev/null @@ -1,39 +0,0 @@ - - - - - Flow_01cweoc - - - - Flow_1xle2yo - - - - Flow_01cweoc - Flow_1xle2yo - add_permission('read', '/test_permission_uri', "test_group") - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/scripts/test_add_permission.py b/spiffworkflow-backend/tests/spiffworkflow_backend/scripts/test_add_permission.py deleted file mode 100644 index 45ff4b25..00000000 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/scripts/test_add_permission.py +++ /dev/null @@ -1,100 +0,0 @@ -"""Test_get_localtime.""" -import pytest -from flask.app import Flask -from flask.testing import FlaskClient -from flask_bpmn.api.api_error import ApiError -from tests.spiffworkflow_backend.helpers.base_test import BaseTest -from tests.spiffworkflow_backend.helpers.test_data import load_test_spec - -from spiffworkflow_backend.models.group import GroupModel -from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel -from spiffworkflow_backend.models.permission_target import PermissionTargetModel -from spiffworkflow_backend.models.script_attributes_context import ( - ScriptAttributesContext, -) -from spiffworkflow_backend.models.user import UserModel -from spiffworkflow_backend.scripts.add_permission import AddPermission -from spiffworkflow_backend.services.process_instance_processor import ( - ProcessInstanceProcessor, -) - - -class TestAddPermission(BaseTest): - """TestAddPermission.""" - - def test_can_add_permission( - self, - app: Flask, - client: FlaskClient, - with_db_and_bpmn_file_cleanup: None, - with_super_admin_user: UserModel, - ) -> None: - """Test_can_get_members_of_a_group.""" - self.find_or_create_user("test_user") - - # now that we have everything, try to clear it out... - script_attributes_context = ScriptAttributesContext( - task=None, - environment_identifier="testing", - process_instance_id=1, - process_model_identifier="my_test_user", - ) - - group = GroupModel.query.filter( - GroupModel.identifier == "my_test_group" - ).first() - permission_target = PermissionTargetModel.query.filter( - PermissionTargetModel.uri == "/test_add_permission/%" - ).first() - assert group is None - assert permission_target is None - - AddPermission().run( - script_attributes_context, "read", "/test_add_permission/*", "my_test_group" - ) - group = GroupModel.query.filter( - GroupModel.identifier == "my_test_group" - ).first() - permission_target = PermissionTargetModel.query.filter( - PermissionTargetModel.uri == "/test_add_permission/%" - ).first() - permission_assignments = PermissionAssignmentModel.query.filter( - PermissionAssignmentModel.principal_id == group.principal.id - ).all() - assert group is not None - assert permission_target is not None - assert len(permission_assignments) == 1 - - def test_add_permission_script_through_bpmn( - self, - app: Flask, - client: FlaskClient, - with_db_and_bpmn_file_cleanup: None, - ) -> None: - """Test_add_permission_script_through_bpmn.""" - basic_user = self.find_or_create_user("basic_user") - privileged_user = self.find_or_create_user("privileged_user") - self.add_permissions_to_user( - privileged_user, - target_uri="/can-run-privileged-script/add_permission", - permission_names=["create"], - ) - process_model = load_test_spec( - process_model_id="add_permission", - process_model_source_directory="script_add_permission", - ) - process_instance = self.create_process_instance_from_process_model( - process_model=process_model, user=basic_user - ) - processor = ProcessInstanceProcessor(process_instance) - - with pytest.raises(ApiError) as exception: - processor.do_engine_steps(save=True) - assert "ScriptUnauthorizedForUserError" in str(exception) - - process_instance = self.create_process_instance_from_process_model( - process_model=process_model, user=privileged_user - ) - processor = ProcessInstanceProcessor(process_instance) - processor.do_engine_steps(save=True) - assert process_instance.status == "complete" diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/scripts/test_add_user_to_group.py b/spiffworkflow-backend/tests/spiffworkflow_backend/scripts/test_add_user_to_group.py deleted file mode 100644 index 2e665a16..00000000 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/scripts/test_add_user_to_group.py +++ /dev/null @@ -1,69 +0,0 @@ -"""Test_get_localtime.""" -from flask.app import Flask -from flask.testing import FlaskClient -from flask_bpmn.models.db import db -from tests.spiffworkflow_backend.helpers.base_test import BaseTest - -from spiffworkflow_backend.models.group import GroupModel -from spiffworkflow_backend.models.script_attributes_context import ( - ScriptAttributesContext, -) -from spiffworkflow_backend.models.user import UserModel -from spiffworkflow_backend.models.user_group_assignment_waiting import ( - UserGroupAssignmentWaitingModel, -) -from spiffworkflow_backend.scripts.add_user_to_group import AddUserToGroup - - -class TestAddUserToGroup(BaseTest): - """TestGetGroupMembers.""" - - def test_can_add_existing_user_to_existing_group( - self, - app: Flask, - client: FlaskClient, - with_db_and_bpmn_file_cleanup: None, - with_super_admin_user: UserModel, - ) -> None: - """Test_can_get_members_of_a_group.""" - my_user = self.find_or_create_user("my_user") - my_group = GroupModel(identifier="my_group") - db.session.add(my_group) - script_attributes_context = ScriptAttributesContext( - task=None, - environment_identifier="testing", - process_instance_id=1, - process_model_identifier="my_test_user", - ) - AddUserToGroup().run( - script_attributes_context, my_user.username, my_group.identifier - ) - assert my_user in my_group.users - - def test_can_add_non_existent_user_to_non_existent_group( - self, - app: Flask, - client: FlaskClient, - with_db_and_bpmn_file_cleanup: None, - with_super_admin_user: UserModel, - ) -> None: - """Test_can_add_non_existent_user_to_non_existent_group.""" - script_attributes_context = ScriptAttributesContext( - task=None, - environment_identifier="testing", - process_instance_id=1, - process_model_identifier="my_test_user", - ) - AddUserToGroup().run( - script_attributes_context, "dan@sartography.com", "competent-joes" - ) - my_group = GroupModel.query.filter( - GroupModel.identifier == "competent-joes" - ).first() - assert my_group is not None - waiting_assignments = ( - UserGroupAssignmentWaitingModel() - .query.filter_by(username="dan@sartography.com") - .first() - ) - assert waiting_assignments is not None diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/scripts/test_delete_permissions.py b/spiffworkflow-backend/tests/spiffworkflow_backend/scripts/test_delete_permissions.py deleted file mode 100644 index 41d6c015..00000000 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/scripts/test_delete_permissions.py +++ /dev/null @@ -1,59 +0,0 @@ -"""Test_get_localtime.""" -from flask.app import Flask -from flask.testing import FlaskClient -from flask_bpmn.models.db import db -from tests.spiffworkflow_backend.helpers.base_test import BaseTest - -from spiffworkflow_backend.models.group import GroupModel -from spiffworkflow_backend.models.permission_target import PermissionTargetModel -from spiffworkflow_backend.models.script_attributes_context import ( - ScriptAttributesContext, -) -from spiffworkflow_backend.models.user import UserModel -from spiffworkflow_backend.scripts.clear_permissions import ClearPermissions -from spiffworkflow_backend.services.authorization_service import AuthorizationService -from spiffworkflow_backend.services.group_service import GroupService -from spiffworkflow_backend.services.user_service import UserService - - -class TestDeletePermissions(BaseTest): - """TestGetGroupMembers.""" - - def test_can_delete_members( - self, - app: Flask, - client: FlaskClient, - with_db_and_bpmn_file_cleanup: None, - with_super_admin_user: UserModel, - ) -> None: - """Test_can_get_members_of_a_group.""" - self.find_or_create_user("initiator_user") - testuser1 = self.find_or_create_user("testuser1") - testuser2 = self.find_or_create_user("testuser2") - testuser3 = self.find_or_create_user("testuser3") - group_a = GroupService.find_or_create_group("groupA") - group_b = GroupService.find_or_create_group("groupB") - db.session.add(group_a) - db.session.add(group_b) - db.session.commit() - UserService.add_user_to_group(testuser1, group_a) - UserService.add_user_to_group(testuser2, group_a) - UserService.add_user_to_group(testuser3, group_b) - - target = PermissionTargetModel("test/*") - db.session.add(target) - db.session.commit() - AuthorizationService.create_permission_for_principal( - group_a.principal, target, "read" - ) - # now that we have everything, try to clear it out... - script_attributes_context = ScriptAttributesContext( - task=None, - environment_identifier="testing", - process_instance_id=1, - process_model_identifier="my_test_user", - ) - ClearPermissions().run(script_attributes_context) - - groups = GroupModel.query.all() - assert 0 == len(groups) diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/scripts/test_get_all_permissions.py b/spiffworkflow-backend/tests/spiffworkflow_backend/scripts/test_get_all_permissions.py index 9f159499..3c3bce50 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/scripts/test_get_all_permissions.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/scripts/test_get_all_permissions.py @@ -7,8 +7,8 @@ from spiffworkflow_backend.models.script_attributes_context import ( ScriptAttributesContext, ) from spiffworkflow_backend.models.user import UserModel -from spiffworkflow_backend.scripts.add_permission import AddPermission from spiffworkflow_backend.scripts.get_all_permissions import GetAllPermissions +from spiffworkflow_backend.services.authorization_service import AuthorizationService class TestGetAllPermissions(BaseTest): @@ -31,10 +31,12 @@ class TestGetAllPermissions(BaseTest): process_instance_id=1, process_model_identifier="my_test_user", ) - AddPermission().run( - script_attributes_context, "start", "PG:hey:group", "my_test_group" + AuthorizationService.add_permission_from_uri_or_macro( + permission="start", target="PG:hey:group", group_identifier="my_test_group" + ) + AuthorizationService.add_permission_from_uri_or_macro( + permission="all", target="/tasks", group_identifier="my_test_group" ) - AddPermission().run(script_attributes_context, "all", "/tasks", "my_test_group") expected_permissions = [ { diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_authorization_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_authorization_service.py index 7d000c9d..a0c140f1 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_authorization_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_authorization_service.py @@ -1,10 +1,10 @@ """Test_message_service.""" import pytest -from spiffworkflow_backend.models.group import GroupModel from flask import Flask from flask.testing import FlaskClient from tests.spiffworkflow_backend.helpers.base_test import BaseTest +from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.models.user import UserNotFoundError from spiffworkflow_backend.services.authorization_service import AuthorizationService @@ -436,6 +436,7 @@ class TestAuthorizationService(BaseTest): client: FlaskClient, with_db_and_bpmn_file_cleanup: None, ) -> None: + """Test_can_refresh_permissions.""" user = self.find_or_create_user(username="user_one") admin_user = self.find_or_create_user(username="testadmin1") @@ -443,14 +444,13 @@ class TestAuthorizationService(BaseTest): GroupService.find_or_create_group("group_two") assert GroupModel.query.filter_by(identifier="group_two").first() is not None - group_info = [{ - 'users': ['user_one'], - 'name': 'group_one', - 'permissions': [{ - 'actions': ['create', 'read'], - 'uri': 'PG:hey' - }] - }] + group_info = [ + { + "users": ["user_one"], + "name": "group_one", + "permissions": [{"actions": ["create", "read"], "uri": "PG:hey"}], + } + ] AuthorizationService.refresh_permissions(group_info) assert GroupModel.query.filter_by(identifier="group_two").first() is None assert GroupModel.query.filter_by(identifier="group_one").first() is not None @@ -459,17 +459,18 @@ class TestAuthorizationService(BaseTest): self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo") self.assert_user_has_permission(user, "create", "/v1.0/process-groups/hey:yo") - group_info = [{ - 'users': ['user_one'], - 'name': 'group_one', - 'permissions': [{ - 'actions': ['read'], - 'uri': 'PG:hey' - }] - }] + group_info = [ + { + "users": ["user_one"], + "name": "group_one", + "permissions": [{"actions": ["read"], "uri": "PG:hey"}], + } + ] AuthorizationService.refresh_permissions(group_info) assert GroupModel.query.filter_by(identifier="group_one").first() is not None self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey") self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo") - self.assert_user_has_permission(user, "create", "/v1.0/process-groups/hey:yo", expected_result=False) + self.assert_user_has_permission( + user, "create", "/v1.0/process-groups/hey:yo", expected_result=False + ) self.assert_user_has_permission(admin_user, "create", "/anything-they-want")