added parse method to turn the yaml into the same format as the incoming perms from the dmn tables w/ burnettk

This commit is contained in:
jasquat 2023-05-18 10:02:07 -04:00
parent 7bfe43d617
commit 5b793d5a81
3 changed files with 58 additions and 10 deletions

View File

@ -34,6 +34,5 @@ class RefreshPermissions(Script):
*args: Any,
**kwargs: Any,
) -> Any:
"""Run."""
group_info = args[0]
AuthorizationService.refresh_permissions(group_info)

View File

@ -94,13 +94,22 @@ class UserToGroupDict(TypedDict):
class DesiredPermissionDict(TypedDict):
"""DesiredPermissionDict."""
group_identifiers: Set[str]
permission_assignments: list[PermissionAssignmentModel]
user_to_group_identifiers: list[UserToGroupDict]
class DesiredGroupPermissionDict(TypedDict):
actions: list[str]
uri: str
class GroupPermissionsDict(TypedDict):
users: list[str]
name: str
permissions: list[DesiredGroupPermissionDict]
class AuthorizationService:
"""Determine whether a user has permission to perform their request."""
@ -699,17 +708,57 @@ class AuthorizationService:
return permission_assignments
@classmethod
def refresh_permissions(cls, group_info: list[dict[str, Any]]) -> None:
def parse_permissions_yaml_into_group_info(cls) -> list[GroupPermissionsDict]:
if current_app.config["SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME"] is None:
raise (
PermissionsFileNotSetError(
"SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME needs to be set in order to import permissions"
)
)
permission_configs = None
with open(current_app.config["SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_ABSOLUTE_PATH"]) as file:
permission_configs = yaml.safe_load(file)
group_permissions: list[GroupPermissionsDict] = []
group_permissions_by_group: dict[str, GroupPermissionsDict] = {}
if "default_group" in permission_configs:
default_group_identifier = permission_configs["default_group"]
# group_permissions.append({"name": default_group_identifier, "users": [], "permissions": []})
group_permissions_by_group[default_group_identifier] = {"name": default_group_identifier, "users": [], "permissions": []}
if "groups" in permission_configs:
for group_identifier, group_config in permission_configs["groups"].items():
group_info: GroupPermissionsDict = {"name": group_identifier, "users": [], "permissions": []}
for username in group_config["users"]:
group_info['users'].append(username)
group_permissions_by_group[group_identifier] = group_info
if "permissions" in permission_configs:
for _permission_identifier, permission_config in permission_configs["permissions"].items():
uri = permission_config["uri"]
for group_identifier in permission_config["groups"]:
group_permissions_by_group[group_identifier]['permissions'].append({'actions': permission_config["allowed_permissions"], "uri": uri})
for _group_identifier, group_permission in group_permissions_by_group.items():
group_permissions.append(group_permission)
return group_permissions
@classmethod
def refresh_permissions(cls, group_info: list[GroupPermissionsDict]) -> None:
"""Adds new permission assignments and deletes old ones."""
initial_permission_assignments = PermissionAssignmentModel.query.all()
initial_user_to_group_assignments = UserGroupAssignmentModel.query.all()
result = cls.import_permissions_from_yaml_file()
desired_permission_assignments = result["permission_assignments"]
desired_group_identifiers = result["group_identifiers"]
desired_user_to_group_identifiers = result["user_to_group_identifiers"]
group_info = group_info + cls.parse_permissions_yaml_into_group_info()
desired_group_identifiers: Set[str] = set()
desired_permission_assignments: list[PermissionAssignmentModel] = []
desired_user_to_group_identifiers: list[UserToGroupDict] = []
for group in group_info:
group_identifier = group["name"]
GroupService.find_or_create_group(group_identifier)
for username in group["users"]:
user_to_group_dict: UserToGroupDict = {
"username": username,

View File

@ -7,7 +7,7 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.models.user import UserNotFoundError
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.authorization_service import AuthorizationService, GroupPermissionsDict
from spiffworkflow_backend.services.authorization_service import InvalidPermissionError
from spiffworkflow_backend.services.group_service import GroupService
from spiffworkflow_backend.services.process_instance_processor import (
@ -399,7 +399,7 @@ class TestAuthorizationService(BaseTest):
GroupService.find_or_create_group("group_three")
assert GroupModel.query.filter_by(identifier="group_three").first() is not None
group_info = [
group_info: list[GroupPermissionsDict] = [
{
"users": ["user_one", "user_two"],
"name": "group_one",