added script to refresh permissions w/ burnettk

This commit is contained in:
jasquat 2022-12-22 16:14:52 -05:00
parent ff61026ff5
commit aa6b46e807
4 changed files with 148 additions and 9 deletions

View File

@ -0,0 +1,40 @@
"""Get_env."""
from typing import Any
from spiffworkflow_backend.models.script_attributes_context import (
ScriptAttributesContext,
)
from spiffworkflow_backend.scripts.script import Script
from spiffworkflow_backend.services.authorization_service import AuthorizationService
# add_permission("read", "test/*", "Editors")
class RecreatePermissions(Script):
def get_description(self) -> str:
"""Get_description."""
return """Add permissions using a dict.
group_info: [
{
'name': group_identifier,
'users': array_of_users,
'permissions': [
{
'actions': array_of_actions - create, read, etc,
'uri': target_uri
}
]
}
]
"""
def run(
self,
script_attributes_context: ScriptAttributesContext,
*args: Any,
**kwargs: Any,
) -> Any:
"""Run."""
group_info = args[0]
AuthorizationService.refresh_permissions(group_info)

View File

@ -1,5 +1,7 @@
"""Authorization_service."""
import inspect
from typing import TypedDict
from typing import Any, Set
import re
from dataclasses import dataclass
from hashlib import sha256
@ -21,6 +23,7 @@ from sqlalchemy import or_
from sqlalchemy import text
from spiffworkflow_backend.helpers.api_version import V1_API_PATH_PREFIX
from spiffworkflow_backend.models import permission_assignment
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.human_task import HumanTaskModel
from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
@ -68,6 +71,11 @@ PATH_SEGMENTS_FOR_PERMISSION_ALL = [
]
class DesiredPermissionDict(TypedDict):
group_identifiers: Set[str]
permission_assignments: list[PermissionAssignmentModel]
class AuthorizationService:
"""Determine whether a user has permission to perform their request."""
@ -179,7 +187,7 @@ class AuthorizationService:
@classmethod
def import_permissions_from_yaml_file(
cls, raise_if_missing_user: bool = False
) -> None:
) -> DesiredPermissionDict:
"""Import_permissions_from_yaml_file."""
if current_app.config["SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME"] is None:
raise (
@ -193,13 +201,16 @@ class AuthorizationService:
permission_configs = yaml.safe_load(file)
default_group = None
unique_user_group_identifiers: Set[str] = set()
if "default_group" in permission_configs:
default_group_identifier = permission_configs["default_group"]
default_group = GroupService.find_or_create_group(default_group_identifier)
unique_user_group_identifiers.add(default_group_identifier)
if "groups" in permission_configs:
for group_identifier, group_config in permission_configs["groups"].items():
group = GroupService.find_or_create_group(group_identifier)
unique_user_group_identifiers.add(group_identifier)
for username in group_config["users"]:
user = UserModel.query.filter_by(username=username).first()
if user is None:
@ -212,6 +223,7 @@ class AuthorizationService:
continue
cls.associate_user_with_group(user, group)
permission_assignments = []
if "permissions" in permission_configs:
for _permission_identifier, permission_config in permission_configs[
"permissions"
@ -223,9 +235,10 @@ class AuthorizationService:
if "groups" in permission_config:
for group_identifier in permission_config["groups"]:
group = GroupService.find_or_create_group(group_identifier)
cls.create_permission_for_principal(
unique_user_group_identifiers.add(group_identifier)
permission_assignments.append(cls.create_permission_for_principal(
group.principal, permission_target, allowed_permission
)
))
if "users" in permission_config:
for username in permission_config["users"]:
user = UserModel.query.filter_by(username=username).first()
@ -235,14 +248,16 @@ class AuthorizationService:
.filter(UserModel.username == username)
.first()
)
cls.create_permission_for_principal(
permission_assignments.append(cls.create_permission_for_principal(
principal, permission_target, allowed_permission
)
))
if default_group is not None:
for user in UserModel.query.all():
cls.associate_user_with_group(user, default_group)
return { 'group_identifiers': unique_user_group_identifiers, 'permission_assignments': permission_assignments }
@classmethod
def find_or_create_permission_target(cls, uri: str) -> PermissionTargetModel:
"""Find_or_create_permission_target."""
@ -691,17 +706,46 @@ class AuthorizationService:
@classmethod
def add_permission_from_uri_or_macro(
cls, group_identifier: str, permission: str, target: str
) -> None:
) -> list[PermissionAssignmentModel]:
"""Add_permission_from_uri_or_macro."""
group = GroupService.find_or_create_group(group_identifier)
permissions_to_assign = cls.explode_permissions(permission, target)
permission_assignments = []
for permission_to_assign in permissions_to_assign:
permission_target = AuthorizationService.find_or_create_permission_target(
permission_target = cls.find_or_create_permission_target(
permission_to_assign.target_uri
)
AuthorizationService.create_permission_for_principal(
permission_assignments.append(cls.create_permission_for_principal(
group.principal, permission_target, permission_to_assign.permission
)
))
return permission_assignments
@classmethod
def refresh_permissions(cls, group_info: list[dict[str, Any]]) -> None:
"""Adds new permission assignments and deletes old ones."""
initial_permission_assignments = PermissionAssignmentModel.query.all()
result = cls.import_permissions_from_yaml_file()
desired_permission_assignments = result['permission_assignments']
desired_group_identifiers = result['group_identifiers']
for group in group_info:
for username in group['users']:
GroupService.add_user_to_group_or_add_to_waiting(username, group['name'])
for permission in group['permissions']:
for crud_op in permission['actions']:
desired_permission_assignments.extend(cls.add_permission_from_uri_or_macro(
group_identifier=group['name'], target=permission['uri'], permission=crud_op
))
desired_group_identifiers.add(group['name'])
for ipa in initial_permission_assignments:
if ipa not in desired_permission_assignments:
db.session.delete(ipa)
groups_to_delete = GroupModel.query.filter(GroupModel.identifier.not_in(desired_group_identifiers)).all()
for gtd in groups_to_delete:
db.session.delete(gtd)
db.session.commit()
class KeycloakAuthorization:

View File

@ -1,5 +1,6 @@
"""Group_service."""
from typing import Optional
from spiffworkflow_backend.models.user import UserModel
from flask_bpmn.models.db import db
@ -22,3 +23,12 @@ class GroupService:
db.session.commit()
UserService.create_principal(group.id, id_column_name="group_id")
return group
@classmethod
def add_user_to_group_or_add_to_waiting(cls, username: str, group_identifier: str) -> None:
group = cls.find_or_create_group(group_identifier)
user = UserModel.query.filter_by(username=username).first()
if user:
UserService.add_user_to_group(user, group)
else:
UserService.add_waiting_group_assignment(username, group)

View File

@ -1,5 +1,6 @@
"""Test_message_service."""
import pytest
from spiffworkflow_backend.models.group import GroupModel
from flask import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
@ -428,3 +429,47 @@ class TestAuthorizationService(BaseTest):
"""Test_explode_permissions_with_start_to_incorrect_target."""
with pytest.raises(InvalidPermissionError):
AuthorizationService.explode_permissions("start", "/hey/model")
def test_can_refresh_permissions(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
user = self.find_or_create_user(username="user_one")
admin_user = self.find_or_create_user(username="testadmin1")
# this group is not mentioned so it will get deleted
GroupService.find_or_create_group("group_two")
assert GroupModel.query.filter_by(identifier="group_two").first() is not None
group_info = [{
'users': ['user_one'],
'name': 'group_one',
'permissions': [{
'actions': ['create', 'read'],
'uri': 'PG:hey'
}]
}]
AuthorizationService.refresh_permissions(group_info)
assert GroupModel.query.filter_by(identifier="group_two").first() is None
assert GroupModel.query.filter_by(identifier="group_one").first() is not None
self.assert_user_has_permission(admin_user, "create", "/anything-they-want")
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey")
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo")
self.assert_user_has_permission(user, "create", "/v1.0/process-groups/hey:yo")
group_info = [{
'users': ['user_one'],
'name': 'group_one',
'permissions': [{
'actions': ['read'],
'uri': 'PG:hey'
}]
}]
AuthorizationService.refresh_permissions(group_info)
assert GroupModel.query.filter_by(identifier="group_one").first() is not None
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey")
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo")
self.assert_user_has_permission(user, "create", "/v1.0/process-groups/hey:yo", expected_result=False)
self.assert_user_has_permission(admin_user, "create", "/anything-they-want")