diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/scripts/refresh_permissions.py b/spiffworkflow-backend/src/spiffworkflow_backend/scripts/refresh_permissions.py index 4981af93d..4a49d7b57 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/scripts/refresh_permissions.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/scripts/refresh_permissions.py @@ -34,6 +34,5 @@ class RefreshPermissions(Script): *args: Any, **kwargs: Any, ) -> Any: - """Run.""" group_info = args[0] AuthorizationService.refresh_permissions(group_info) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/authorization_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/authorization_service.py index 761f14a2f..ccf51cd8f 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/authorization_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/authorization_service.py @@ -94,13 +94,22 @@ class UserToGroupDict(TypedDict): class DesiredPermissionDict(TypedDict): - """DesiredPermissionDict.""" - group_identifiers: Set[str] permission_assignments: list[PermissionAssignmentModel] user_to_group_identifiers: list[UserToGroupDict] +class DesiredGroupPermissionDict(TypedDict): + actions: list[str] + uri: str + + +class GroupPermissionsDict(TypedDict): + users: list[str] + name: str + permissions: list[DesiredGroupPermissionDict] + + class AuthorizationService: """Determine whether a user has permission to perform their request.""" @@ -699,17 +708,57 @@ class AuthorizationService: return permission_assignments @classmethod - def refresh_permissions(cls, group_info: list[dict[str, Any]]) -> None: + def parse_permissions_yaml_into_group_info(cls) -> list[GroupPermissionsDict]: + if current_app.config["SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME"] is None: + raise ( + PermissionsFileNotSetError( + "SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME needs to be set in order to import permissions" + ) + ) + + permission_configs = None + with open(current_app.config["SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_ABSOLUTE_PATH"]) as file: + permission_configs = yaml.safe_load(file) + + group_permissions: list[GroupPermissionsDict] = [] + group_permissions_by_group: dict[str, GroupPermissionsDict] = {} + if "default_group" in permission_configs: + default_group_identifier = permission_configs["default_group"] + # group_permissions.append({"name": default_group_identifier, "users": [], "permissions": []}) + group_permissions_by_group[default_group_identifier] = {"name": default_group_identifier, "users": [], "permissions": []} + + if "groups" in permission_configs: + for group_identifier, group_config in permission_configs["groups"].items(): + group_info: GroupPermissionsDict = {"name": group_identifier, "users": [], "permissions": []} + for username in group_config["users"]: + group_info['users'].append(username) + group_permissions_by_group[group_identifier] = group_info + + if "permissions" in permission_configs: + for _permission_identifier, permission_config in permission_configs["permissions"].items(): + uri = permission_config["uri"] + for group_identifier in permission_config["groups"]: + group_permissions_by_group[group_identifier]['permissions'].append({'actions': permission_config["allowed_permissions"], "uri": uri}) + + for _group_identifier, group_permission in group_permissions_by_group.items(): + group_permissions.append(group_permission) + + return group_permissions + + @classmethod + def refresh_permissions(cls, group_info: list[GroupPermissionsDict]) -> None: """Adds new permission assignments and deletes old ones.""" initial_permission_assignments = PermissionAssignmentModel.query.all() initial_user_to_group_assignments = UserGroupAssignmentModel.query.all() - result = cls.import_permissions_from_yaml_file() - desired_permission_assignments = result["permission_assignments"] - desired_group_identifiers = result["group_identifiers"] - desired_user_to_group_identifiers = result["user_to_group_identifiers"] + group_info = group_info + cls.parse_permissions_yaml_into_group_info() + + desired_group_identifiers: Set[str] = set() + desired_permission_assignments: list[PermissionAssignmentModel] = [] + desired_user_to_group_identifiers: list[UserToGroupDict] = [] for group in group_info: group_identifier = group["name"] + GroupService.find_or_create_group(group_identifier) for username in group["users"]: user_to_group_dict: UserToGroupDict = { "username": username, diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_authorization_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_authorization_service.py index b742c4639..5712cb62c 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_authorization_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_authorization_service.py @@ -7,7 +7,7 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.models.user import UserNotFoundError -from spiffworkflow_backend.services.authorization_service import AuthorizationService +from spiffworkflow_backend.services.authorization_service import AuthorizationService, GroupPermissionsDict from spiffworkflow_backend.services.authorization_service import InvalidPermissionError from spiffworkflow_backend.services.group_service import GroupService from spiffworkflow_backend.services.process_instance_processor import ( @@ -399,7 +399,7 @@ class TestAuthorizationService(BaseTest): GroupService.find_or_create_group("group_three") assert GroupModel.query.filter_by(identifier="group_three").first() is not None - group_info = [ + group_info: list[GroupPermissionsDict] = [ { "users": ["user_one", "user_two"], "name": "group_one",