added script to refresh permissions w/ burnettk
This commit is contained in:
parent
3de31af94f
commit
99e8dccd6e
|
@ -0,0 +1,40 @@
|
|||
"""Get_env."""
|
||||
from typing import Any
|
||||
|
||||
from spiffworkflow_backend.models.script_attributes_context import (
|
||||
ScriptAttributesContext,
|
||||
)
|
||||
from spiffworkflow_backend.scripts.script import Script
|
||||
from spiffworkflow_backend.services.authorization_service import AuthorizationService
|
||||
|
||||
# add_permission("read", "test/*", "Editors")
|
||||
|
||||
|
||||
class RecreatePermissions(Script):
|
||||
|
||||
def get_description(self) -> str:
|
||||
"""Get_description."""
|
||||
return """Add permissions using a dict.
|
||||
group_info: [
|
||||
{
|
||||
'name': group_identifier,
|
||||
'users': array_of_users,
|
||||
'permissions': [
|
||||
{
|
||||
'actions': array_of_actions - create, read, etc,
|
||||
'uri': target_uri
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
"""
|
||||
|
||||
def run(
|
||||
self,
|
||||
script_attributes_context: ScriptAttributesContext,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
"""Run."""
|
||||
group_info = args[0]
|
||||
AuthorizationService.refresh_permissions(group_info)
|
|
@ -1,5 +1,7 @@
|
|||
"""Authorization_service."""
|
||||
import inspect
|
||||
from typing import TypedDict
|
||||
from typing import Any, Set
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from hashlib import sha256
|
||||
|
@ -21,6 +23,7 @@ from sqlalchemy import or_
|
|||
from sqlalchemy import text
|
||||
|
||||
from spiffworkflow_backend.helpers.api_version import V1_API_PATH_PREFIX
|
||||
from spiffworkflow_backend.models import permission_assignment
|
||||
from spiffworkflow_backend.models.group import GroupModel
|
||||
from spiffworkflow_backend.models.human_task import HumanTaskModel
|
||||
from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
|
||||
|
@ -68,6 +71,11 @@ PATH_SEGMENTS_FOR_PERMISSION_ALL = [
|
|||
]
|
||||
|
||||
|
||||
class DesiredPermissionDict(TypedDict):
|
||||
group_identifiers: Set[str]
|
||||
permission_assignments: list[PermissionAssignmentModel]
|
||||
|
||||
|
||||
class AuthorizationService:
|
||||
"""Determine whether a user has permission to perform their request."""
|
||||
|
||||
|
@ -179,7 +187,7 @@ class AuthorizationService:
|
|||
@classmethod
|
||||
def import_permissions_from_yaml_file(
|
||||
cls, raise_if_missing_user: bool = False
|
||||
) -> None:
|
||||
) -> DesiredPermissionDict:
|
||||
"""Import_permissions_from_yaml_file."""
|
||||
if current_app.config["SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME"] is None:
|
||||
raise (
|
||||
|
@ -193,13 +201,16 @@ class AuthorizationService:
|
|||
permission_configs = yaml.safe_load(file)
|
||||
|
||||
default_group = None
|
||||
unique_user_group_identifiers: Set[str] = set()
|
||||
if "default_group" in permission_configs:
|
||||
default_group_identifier = permission_configs["default_group"]
|
||||
default_group = GroupService.find_or_create_group(default_group_identifier)
|
||||
unique_user_group_identifiers.add(default_group_identifier)
|
||||
|
||||
if "groups" in permission_configs:
|
||||
for group_identifier, group_config in permission_configs["groups"].items():
|
||||
group = GroupService.find_or_create_group(group_identifier)
|
||||
unique_user_group_identifiers.add(group_identifier)
|
||||
for username in group_config["users"]:
|
||||
user = UserModel.query.filter_by(username=username).first()
|
||||
if user is None:
|
||||
|
@ -212,6 +223,7 @@ class AuthorizationService:
|
|||
continue
|
||||
cls.associate_user_with_group(user, group)
|
||||
|
||||
permission_assignments = []
|
||||
if "permissions" in permission_configs:
|
||||
for _permission_identifier, permission_config in permission_configs[
|
||||
"permissions"
|
||||
|
@ -223,9 +235,10 @@ class AuthorizationService:
|
|||
if "groups" in permission_config:
|
||||
for group_identifier in permission_config["groups"]:
|
||||
group = GroupService.find_or_create_group(group_identifier)
|
||||
cls.create_permission_for_principal(
|
||||
unique_user_group_identifiers.add(group_identifier)
|
||||
permission_assignments.append(cls.create_permission_for_principal(
|
||||
group.principal, permission_target, allowed_permission
|
||||
)
|
||||
))
|
||||
if "users" in permission_config:
|
||||
for username in permission_config["users"]:
|
||||
user = UserModel.query.filter_by(username=username).first()
|
||||
|
@ -235,14 +248,16 @@ class AuthorizationService:
|
|||
.filter(UserModel.username == username)
|
||||
.first()
|
||||
)
|
||||
cls.create_permission_for_principal(
|
||||
permission_assignments.append(cls.create_permission_for_principal(
|
||||
principal, permission_target, allowed_permission
|
||||
)
|
||||
))
|
||||
|
||||
if default_group is not None:
|
||||
for user in UserModel.query.all():
|
||||
cls.associate_user_with_group(user, default_group)
|
||||
|
||||
return { 'group_identifiers': unique_user_group_identifiers, 'permission_assignments': permission_assignments }
|
||||
|
||||
@classmethod
|
||||
def find_or_create_permission_target(cls, uri: str) -> PermissionTargetModel:
|
||||
"""Find_or_create_permission_target."""
|
||||
|
@ -691,17 +706,46 @@ class AuthorizationService:
|
|||
@classmethod
|
||||
def add_permission_from_uri_or_macro(
|
||||
cls, group_identifier: str, permission: str, target: str
|
||||
) -> None:
|
||||
) -> list[PermissionAssignmentModel]:
|
||||
"""Add_permission_from_uri_or_macro."""
|
||||
group = GroupService.find_or_create_group(group_identifier)
|
||||
permissions_to_assign = cls.explode_permissions(permission, target)
|
||||
permission_assignments = []
|
||||
for permission_to_assign in permissions_to_assign:
|
||||
permission_target = AuthorizationService.find_or_create_permission_target(
|
||||
permission_target = cls.find_or_create_permission_target(
|
||||
permission_to_assign.target_uri
|
||||
)
|
||||
AuthorizationService.create_permission_for_principal(
|
||||
permission_assignments.append(cls.create_permission_for_principal(
|
||||
group.principal, permission_target, permission_to_assign.permission
|
||||
)
|
||||
))
|
||||
return permission_assignments
|
||||
|
||||
@classmethod
|
||||
def refresh_permissions(cls, group_info: list[dict[str, Any]]) -> None:
|
||||
"""Adds new permission assignments and deletes old ones."""
|
||||
initial_permission_assignments = PermissionAssignmentModel.query.all()
|
||||
result = cls.import_permissions_from_yaml_file()
|
||||
desired_permission_assignments = result['permission_assignments']
|
||||
desired_group_identifiers = result['group_identifiers']
|
||||
|
||||
for group in group_info:
|
||||
for username in group['users']:
|
||||
GroupService.add_user_to_group_or_add_to_waiting(username, group['name'])
|
||||
for permission in group['permissions']:
|
||||
for crud_op in permission['actions']:
|
||||
desired_permission_assignments.extend(cls.add_permission_from_uri_or_macro(
|
||||
group_identifier=group['name'], target=permission['uri'], permission=crud_op
|
||||
))
|
||||
desired_group_identifiers.add(group['name'])
|
||||
|
||||
for ipa in initial_permission_assignments:
|
||||
if ipa not in desired_permission_assignments:
|
||||
db.session.delete(ipa)
|
||||
|
||||
groups_to_delete = GroupModel.query.filter(GroupModel.identifier.not_in(desired_group_identifiers)).all()
|
||||
for gtd in groups_to_delete:
|
||||
db.session.delete(gtd)
|
||||
db.session.commit()
|
||||
|
||||
|
||||
class KeycloakAuthorization:
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
"""Group_service."""
|
||||
from typing import Optional
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
|
||||
from flask_bpmn.models.db import db
|
||||
|
||||
|
@ -22,3 +23,12 @@ class GroupService:
|
|||
db.session.commit()
|
||||
UserService.create_principal(group.id, id_column_name="group_id")
|
||||
return group
|
||||
|
||||
@classmethod
|
||||
def add_user_to_group_or_add_to_waiting(cls, username: str, group_identifier: str) -> None:
|
||||
group = cls.find_or_create_group(group_identifier)
|
||||
user = UserModel.query.filter_by(username=username).first()
|
||||
if user:
|
||||
UserService.add_user_to_group(user, group)
|
||||
else:
|
||||
UserService.add_waiting_group_assignment(username, group)
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
"""Test_message_service."""
|
||||
import pytest
|
||||
from spiffworkflow_backend.models.group import GroupModel
|
||||
from flask import Flask
|
||||
from flask.testing import FlaskClient
|
||||
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||
|
@ -428,3 +429,47 @@ class TestAuthorizationService(BaseTest):
|
|||
"""Test_explode_permissions_with_start_to_incorrect_target."""
|
||||
with pytest.raises(InvalidPermissionError):
|
||||
AuthorizationService.explode_permissions("start", "/hey/model")
|
||||
|
||||
def test_can_refresh_permissions(
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
) -> None:
|
||||
user = self.find_or_create_user(username="user_one")
|
||||
admin_user = self.find_or_create_user(username="testadmin1")
|
||||
|
||||
# this group is not mentioned so it will get deleted
|
||||
GroupService.find_or_create_group("group_two")
|
||||
assert GroupModel.query.filter_by(identifier="group_two").first() is not None
|
||||
|
||||
group_info = [{
|
||||
'users': ['user_one'],
|
||||
'name': 'group_one',
|
||||
'permissions': [{
|
||||
'actions': ['create', 'read'],
|
||||
'uri': 'PG:hey'
|
||||
}]
|
||||
}]
|
||||
AuthorizationService.refresh_permissions(group_info)
|
||||
assert GroupModel.query.filter_by(identifier="group_two").first() is None
|
||||
assert GroupModel.query.filter_by(identifier="group_one").first() is not None
|
||||
self.assert_user_has_permission(admin_user, "create", "/anything-they-want")
|
||||
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey")
|
||||
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo")
|
||||
self.assert_user_has_permission(user, "create", "/v1.0/process-groups/hey:yo")
|
||||
|
||||
group_info = [{
|
||||
'users': ['user_one'],
|
||||
'name': 'group_one',
|
||||
'permissions': [{
|
||||
'actions': ['read'],
|
||||
'uri': 'PG:hey'
|
||||
}]
|
||||
}]
|
||||
AuthorizationService.refresh_permissions(group_info)
|
||||
assert GroupModel.query.filter_by(identifier="group_one").first() is not None
|
||||
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey")
|
||||
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo")
|
||||
self.assert_user_has_permission(user, "create", "/v1.0/process-groups/hey:yo", expected_result=False)
|
||||
self.assert_user_has_permission(admin_user, "create", "/anything-they-want")
|
||||
|
|
Loading…
Reference in New Issue