From 99e8dccd6ec777d2b999af3e289b7c20ab3dccac Mon Sep 17 00:00:00 2001 From: jasquat Date: Thu, 22 Dec 2022 16:14:52 -0500 Subject: [PATCH] added script to refresh permissions w/ burnettk --- .../scripts/refresh_permissions.py | 40 ++++++++++++ .../services/authorization_service.py | 62 ++++++++++++++++--- .../services/group_service.py | 10 +++ .../unit/test_authorization_service.py | 45 ++++++++++++++ 4 files changed, 148 insertions(+), 9 deletions(-) create mode 100644 spiffworkflow-backend/src/spiffworkflow_backend/scripts/refresh_permissions.py diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/scripts/refresh_permissions.py b/spiffworkflow-backend/src/spiffworkflow_backend/scripts/refresh_permissions.py new file mode 100644 index 00000000..b7e46dd6 --- /dev/null +++ b/spiffworkflow-backend/src/spiffworkflow_backend/scripts/refresh_permissions.py @@ -0,0 +1,40 @@ +"""Get_env.""" +from typing import Any + +from spiffworkflow_backend.models.script_attributes_context import ( + ScriptAttributesContext, +) +from spiffworkflow_backend.scripts.script import Script +from spiffworkflow_backend.services.authorization_service import AuthorizationService + +# add_permission("read", "test/*", "Editors") + + +class RecreatePermissions(Script): + + def get_description(self) -> str: + """Get_description.""" + return """Add permissions using a dict. + group_info: [ + { + 'name': group_identifier, + 'users': array_of_users, + 'permissions': [ + { + 'actions': array_of_actions - create, read, etc, + 'uri': target_uri + } + ] + } + ] + """ + + def run( + self, + script_attributes_context: ScriptAttributesContext, + *args: Any, + **kwargs: Any, + ) -> Any: + """Run.""" + group_info = args[0] + AuthorizationService.refresh_permissions(group_info) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/authorization_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/authorization_service.py index 4ebba797..6ab240d0 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/authorization_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/authorization_service.py @@ -1,5 +1,7 @@ """Authorization_service.""" import inspect +from typing import TypedDict +from typing import Any, Set import re from dataclasses import dataclass from hashlib import sha256 @@ -21,6 +23,7 @@ from sqlalchemy import or_ from sqlalchemy import text from spiffworkflow_backend.helpers.api_version import V1_API_PATH_PREFIX +from spiffworkflow_backend.models import permission_assignment from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.human_task import HumanTaskModel from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel @@ -68,6 +71,11 @@ PATH_SEGMENTS_FOR_PERMISSION_ALL = [ ] +class DesiredPermissionDict(TypedDict): + group_identifiers: Set[str] + permission_assignments: list[PermissionAssignmentModel] + + class AuthorizationService: """Determine whether a user has permission to perform their request.""" @@ -179,7 +187,7 @@ class AuthorizationService: @classmethod def import_permissions_from_yaml_file( cls, raise_if_missing_user: bool = False - ) -> None: + ) -> DesiredPermissionDict: """Import_permissions_from_yaml_file.""" if current_app.config["SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME"] is None: raise ( @@ -193,13 +201,16 @@ class AuthorizationService: permission_configs = yaml.safe_load(file) default_group = None + unique_user_group_identifiers: Set[str] = set() if "default_group" in permission_configs: default_group_identifier = permission_configs["default_group"] default_group = GroupService.find_or_create_group(default_group_identifier) + unique_user_group_identifiers.add(default_group_identifier) if "groups" in permission_configs: for group_identifier, group_config in permission_configs["groups"].items(): group = GroupService.find_or_create_group(group_identifier) + unique_user_group_identifiers.add(group_identifier) for username in group_config["users"]: user = UserModel.query.filter_by(username=username).first() if user is None: @@ -212,6 +223,7 @@ class AuthorizationService: continue cls.associate_user_with_group(user, group) + permission_assignments = [] if "permissions" in permission_configs: for _permission_identifier, permission_config in permission_configs[ "permissions" @@ -223,9 +235,10 @@ class AuthorizationService: if "groups" in permission_config: for group_identifier in permission_config["groups"]: group = GroupService.find_or_create_group(group_identifier) - cls.create_permission_for_principal( + unique_user_group_identifiers.add(group_identifier) + permission_assignments.append(cls.create_permission_for_principal( group.principal, permission_target, allowed_permission - ) + )) if "users" in permission_config: for username in permission_config["users"]: user = UserModel.query.filter_by(username=username).first() @@ -235,14 +248,16 @@ class AuthorizationService: .filter(UserModel.username == username) .first() ) - cls.create_permission_for_principal( + permission_assignments.append(cls.create_permission_for_principal( principal, permission_target, allowed_permission - ) + )) if default_group is not None: for user in UserModel.query.all(): cls.associate_user_with_group(user, default_group) + return { 'group_identifiers': unique_user_group_identifiers, 'permission_assignments': permission_assignments } + @classmethod def find_or_create_permission_target(cls, uri: str) -> PermissionTargetModel: """Find_or_create_permission_target.""" @@ -691,17 +706,46 @@ class AuthorizationService: @classmethod def add_permission_from_uri_or_macro( cls, group_identifier: str, permission: str, target: str - ) -> None: + ) -> list[PermissionAssignmentModel]: """Add_permission_from_uri_or_macro.""" group = GroupService.find_or_create_group(group_identifier) permissions_to_assign = cls.explode_permissions(permission, target) + permission_assignments = [] for permission_to_assign in permissions_to_assign: - permission_target = AuthorizationService.find_or_create_permission_target( + permission_target = cls.find_or_create_permission_target( permission_to_assign.target_uri ) - AuthorizationService.create_permission_for_principal( + permission_assignments.append(cls.create_permission_for_principal( group.principal, permission_target, permission_to_assign.permission - ) + )) + return permission_assignments + + @classmethod + def refresh_permissions(cls, group_info: list[dict[str, Any]]) -> None: + """Adds new permission assignments and deletes old ones.""" + initial_permission_assignments = PermissionAssignmentModel.query.all() + result = cls.import_permissions_from_yaml_file() + desired_permission_assignments = result['permission_assignments'] + desired_group_identifiers = result['group_identifiers'] + + for group in group_info: + for username in group['users']: + GroupService.add_user_to_group_or_add_to_waiting(username, group['name']) + for permission in group['permissions']: + for crud_op in permission['actions']: + desired_permission_assignments.extend(cls.add_permission_from_uri_or_macro( + group_identifier=group['name'], target=permission['uri'], permission=crud_op + )) + desired_group_identifiers.add(group['name']) + + for ipa in initial_permission_assignments: + if ipa not in desired_permission_assignments: + db.session.delete(ipa) + + groups_to_delete = GroupModel.query.filter(GroupModel.identifier.not_in(desired_group_identifiers)).all() + for gtd in groups_to_delete: + db.session.delete(gtd) + db.session.commit() class KeycloakAuthorization: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/group_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/group_service.py index aa560009..85f6441f 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/group_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/group_service.py @@ -1,5 +1,6 @@ """Group_service.""" from typing import Optional +from spiffworkflow_backend.models.user import UserModel from flask_bpmn.models.db import db @@ -22,3 +23,12 @@ class GroupService: db.session.commit() UserService.create_principal(group.id, id_column_name="group_id") return group + + @classmethod + def add_user_to_group_or_add_to_waiting(cls, username: str, group_identifier: str) -> None: + group = cls.find_or_create_group(group_identifier) + user = UserModel.query.filter_by(username=username).first() + if user: + UserService.add_user_to_group(user, group) + else: + UserService.add_waiting_group_assignment(username, group) diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_authorization_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_authorization_service.py index b149ac54..7d000c9d 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_authorization_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_authorization_service.py @@ -1,5 +1,6 @@ """Test_message_service.""" import pytest +from spiffworkflow_backend.models.group import GroupModel from flask import Flask from flask.testing import FlaskClient from tests.spiffworkflow_backend.helpers.base_test import BaseTest @@ -428,3 +429,47 @@ class TestAuthorizationService(BaseTest): """Test_explode_permissions_with_start_to_incorrect_target.""" with pytest.raises(InvalidPermissionError): AuthorizationService.explode_permissions("start", "/hey/model") + + def test_can_refresh_permissions( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + user = self.find_or_create_user(username="user_one") + admin_user = self.find_or_create_user(username="testadmin1") + + # this group is not mentioned so it will get deleted + GroupService.find_or_create_group("group_two") + assert GroupModel.query.filter_by(identifier="group_two").first() is not None + + group_info = [{ + 'users': ['user_one'], + 'name': 'group_one', + 'permissions': [{ + 'actions': ['create', 'read'], + 'uri': 'PG:hey' + }] + }] + AuthorizationService.refresh_permissions(group_info) + assert GroupModel.query.filter_by(identifier="group_two").first() is None + assert GroupModel.query.filter_by(identifier="group_one").first() is not None + self.assert_user_has_permission(admin_user, "create", "/anything-they-want") + self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey") + self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo") + self.assert_user_has_permission(user, "create", "/v1.0/process-groups/hey:yo") + + group_info = [{ + 'users': ['user_one'], + 'name': 'group_one', + 'permissions': [{ + 'actions': ['read'], + 'uri': 'PG:hey' + }] + }] + AuthorizationService.refresh_permissions(group_info) + assert GroupModel.query.filter_by(identifier="group_one").first() is not None + self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey") + self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo") + self.assert_user_has_permission(user, "create", "/v1.0/process-groups/hey:yo", expected_result=False) + self.assert_user_has_permission(admin_user, "create", "/anything-they-want")