moved remove permission code to own method and some cleanup
This commit is contained in:
parent
40b3246eb7
commit
a445badcd1
|
@ -1,4 +1,3 @@
|
|||
"""Authorization_service."""
|
||||
import inspect
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
|
@ -40,25 +39,23 @@ from spiffworkflow_backend.services.user_service import UserService
|
|||
|
||||
|
||||
class PermissionsFileNotSetError(Exception):
|
||||
"""PermissionsFileNotSetError."""
|
||||
pass
|
||||
|
||||
|
||||
class HumanTaskNotFoundError(Exception):
|
||||
"""HumanTaskNotFoundError."""
|
||||
pass
|
||||
|
||||
|
||||
class UserDoesNotHaveAccessToTaskError(Exception):
|
||||
"""UserDoesNotHaveAccessToTaskError."""
|
||||
pass
|
||||
|
||||
|
||||
class InvalidPermissionError(Exception):
|
||||
"""InvalidPermissionError."""
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class PermissionToAssign:
|
||||
"""PermissionToAssign."""
|
||||
|
||||
permission: str
|
||||
target_uri: str
|
||||
|
||||
|
@ -91,7 +88,7 @@ class UserToGroupDict(TypedDict):
|
|||
group_identifier: str
|
||||
|
||||
|
||||
class DesiredPermissionDict(TypedDict):
|
||||
class AddedPermissionDict(TypedDict):
|
||||
group_identifiers: Set[str]
|
||||
permission_assignments: list[PermissionAssignmentModel]
|
||||
user_to_group_identifiers: list[UserToGroupDict]
|
||||
|
@ -114,7 +111,6 @@ class AuthorizationService:
|
|||
# https://stackoverflow.com/a/71320673/6090676
|
||||
@classmethod
|
||||
def verify_sha256_token(cls, auth_header: Optional[str]) -> None:
|
||||
"""Verify_sha256_token."""
|
||||
if auth_header is None:
|
||||
raise TokenNotProvidedError(
|
||||
"unauthorized",
|
||||
|
@ -130,7 +126,6 @@ class AuthorizationService:
|
|||
|
||||
@classmethod
|
||||
def has_permission(cls, principals: list[PrincipalModel], permission: str, target_uri: str) -> bool:
|
||||
"""Has_permission."""
|
||||
principal_ids = [p.id for p in principals]
|
||||
target_uri_normalized = target_uri.removeprefix(V1_API_PATH_PREFIX)
|
||||
|
||||
|
@ -160,7 +155,6 @@ class AuthorizationService:
|
|||
|
||||
@classmethod
|
||||
def user_has_permission(cls, user: UserModel, permission: str, target_uri: str) -> bool:
|
||||
"""User_has_permission."""
|
||||
if user.principal is None:
|
||||
raise MissingPrincipalError(f"Missing principal for user with id: {user.id}")
|
||||
|
||||
|
@ -186,7 +180,6 @@ class AuthorizationService:
|
|||
|
||||
@classmethod
|
||||
def associate_user_with_group(cls, user: UserModel, group: GroupModel) -> None:
|
||||
"""Associate_user_with_group."""
|
||||
user_group_assignemnt = UserGroupAssignmentModel.query.filter_by(user_id=user.id, group_id=group.id).first()
|
||||
if user_group_assignemnt is None:
|
||||
user_group_assignemnt = UserGroupAssignmentModel(user_id=user.id, group_id=group.id)
|
||||
|
@ -194,7 +187,7 @@ class AuthorizationService:
|
|||
db.session.commit()
|
||||
|
||||
@classmethod
|
||||
def import_permissions_from_yaml_file(cls, user_model: Optional[UserModel] = None) -> DesiredPermissionDict:
|
||||
def import_permissions_from_yaml_file(cls, user_model: Optional[UserModel] = None) -> AddedPermissionDict:
|
||||
group_permissions = cls.parse_permissions_yaml_into_group_info()
|
||||
result = cls.add_permissions_from_group_permissions(group_permissions, user_model)
|
||||
return result
|
||||
|
@ -292,7 +285,6 @@ class AuthorizationService:
|
|||
|
||||
@classmethod
|
||||
def check_for_permission(cls) -> None:
|
||||
"""Check_for_permission."""
|
||||
if cls.should_disable_auth_for_request():
|
||||
return None
|
||||
|
||||
|
@ -326,11 +318,6 @@ class AuthorizationService:
|
|||
|
||||
@staticmethod
|
||||
def decode_auth_token(auth_token: str) -> dict[str, Union[str, None]]:
|
||||
"""Decode the auth token.
|
||||
|
||||
:param auth_token:
|
||||
:return: integer|string
|
||||
"""
|
||||
secret_key = current_app.config.get("SECRET_KEY")
|
||||
if secret_key is None:
|
||||
raise KeyError("we need current_app.config to have a SECRET_KEY")
|
||||
|
@ -374,10 +361,11 @@ class AuthorizationService:
|
|||
|
||||
@classmethod
|
||||
def create_user_from_sign_in(cls, user_info: dict) -> UserModel:
|
||||
"""Create_user_from_sign_in."""
|
||||
"""Name, family_name, given_name, middle_name, nickname, preferred_username,"""
|
||||
"""Profile, picture, website, gender, birthdate, zoneinfo, locale, and updated_at. """
|
||||
"""Email."""
|
||||
"""Fields from user_info.
|
||||
|
||||
name, family_name, given_name, middle_name, nickname, preferred_username,
|
||||
profile, picture, website, gender, birthdate, zoneinfo, locale,updated_at, email.
|
||||
"""
|
||||
is_new_user = False
|
||||
user_attributes = {}
|
||||
|
||||
|
@ -450,7 +438,6 @@ class AuthorizationService:
|
|||
process_related_path_segment: str,
|
||||
target_uris: list[str],
|
||||
) -> list[PermissionToAssign]:
|
||||
"""Get_permissions_to_assign."""
|
||||
permissions = permission_set.split(",")
|
||||
if permission_set == "all":
|
||||
permissions = ["create", "read", "update", "delete"]
|
||||
|
@ -500,7 +487,6 @@ class AuthorizationService:
|
|||
|
||||
@classmethod
|
||||
def set_basic_permissions(cls) -> list[PermissionToAssign]:
|
||||
"""Set_basic_permissions."""
|
||||
permissions_to_assign: list[PermissionToAssign] = []
|
||||
permissions_to_assign.append(PermissionToAssign(permission="create", target_uri="/process-instances/for-me"))
|
||||
permissions_to_assign.append(
|
||||
|
@ -528,7 +514,6 @@ class AuthorizationService:
|
|||
|
||||
@classmethod
|
||||
def set_process_group_permissions(cls, target: str, permission_set: str) -> list[PermissionToAssign]:
|
||||
"""Set_process_group_permissions."""
|
||||
permissions_to_assign: list[PermissionToAssign] = []
|
||||
process_group_identifier = target.removeprefix("PG:").replace("/", ":").removeprefix(":")
|
||||
process_related_path_segment = f"{process_group_identifier}:*"
|
||||
|
@ -545,7 +530,6 @@ class AuthorizationService:
|
|||
|
||||
@classmethod
|
||||
def set_process_model_permissions(cls, target: str, permission_set: str) -> list[PermissionToAssign]:
|
||||
"""Set_process_model_permissions."""
|
||||
permissions_to_assign: list[PermissionToAssign] = []
|
||||
process_model_identifier = target.removeprefix("PM:").replace("/", ":").removeprefix(":")
|
||||
process_related_path_segment = f"{process_model_identifier}/*"
|
||||
|
@ -668,7 +652,7 @@ class AuthorizationService:
|
|||
@classmethod
|
||||
def add_permissions_from_group_permissions(
|
||||
cls, group_permissions: list[GroupPermissionsDict], user_model: Optional[UserModel] = None
|
||||
) -> DesiredPermissionDict:
|
||||
) -> AddedPermissionDict:
|
||||
unique_user_group_identifiers: Set[str] = set()
|
||||
user_to_group_identifiers: list[UserToGroupDict] = []
|
||||
permission_assignments = []
|
||||
|
@ -715,19 +699,18 @@ class AuthorizationService:
|
|||
}
|
||||
|
||||
@classmethod
|
||||
def refresh_permissions(cls, group_permissions: list[GroupPermissionsDict]) -> None:
|
||||
"""Adds new permission assignments and deletes old ones."""
|
||||
initial_permission_assignments = PermissionAssignmentModel.query.all()
|
||||
initial_user_to_group_assignments = UserGroupAssignmentModel.query.all()
|
||||
group_permissions = group_permissions + cls.parse_permissions_yaml_into_group_info()
|
||||
|
||||
result = cls.add_permissions_from_group_permissions(group_permissions)
|
||||
desired_permission_assignments = result["permission_assignments"]
|
||||
desired_group_identifiers = result["group_identifiers"]
|
||||
desired_user_to_group_identifiers = result["user_to_group_identifiers"]
|
||||
def remove_old_permissions_from_added_permissions(
|
||||
cls,
|
||||
added_permissions: AddedPermissionDict,
|
||||
initial_permission_assignments: list[PermissionAssignmentModel],
|
||||
initial_user_to_group_assignments: list[UserGroupAssignmentModel],
|
||||
) -> None:
|
||||
added_permission_assignments = added_permissions["permission_assignments"]
|
||||
added_group_identifiers = added_permissions["group_identifiers"]
|
||||
added_user_to_group_identifiers = added_permissions["user_to_group_identifiers"]
|
||||
|
||||
for ipa in initial_permission_assignments:
|
||||
if ipa not in desired_permission_assignments:
|
||||
if ipa not in added_permission_assignments:
|
||||
db.session.delete(ipa)
|
||||
|
||||
for iutga in initial_user_to_group_assignments:
|
||||
|
@ -740,12 +723,23 @@ class AuthorizationService:
|
|||
"username": iutga.user.username,
|
||||
"group_identifier": iutga.group.identifier,
|
||||
}
|
||||
if current_user_dict not in desired_user_to_group_identifiers:
|
||||
if current_user_dict not in added_user_to_group_identifiers:
|
||||
db.session.delete(iutga)
|
||||
|
||||
# do not remove the default user group
|
||||
desired_group_identifiers.add(current_app.config["SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP"])
|
||||
groups_to_delete = GroupModel.query.filter(GroupModel.identifier.not_in(desired_group_identifiers)).all()
|
||||
added_group_identifiers.add(current_app.config["SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP"])
|
||||
groups_to_delete = GroupModel.query.filter(GroupModel.identifier.not_in(added_group_identifiers)).all()
|
||||
for gtd in groups_to_delete:
|
||||
db.session.delete(gtd)
|
||||
db.session.commit()
|
||||
|
||||
@classmethod
|
||||
def refresh_permissions(cls, group_permissions: list[GroupPermissionsDict]) -> None:
|
||||
"""Adds new permission assignments and deletes old ones."""
|
||||
initial_permission_assignments = PermissionAssignmentModel.query.all()
|
||||
initial_user_to_group_assignments = UserGroupAssignmentModel.query.all()
|
||||
group_permissions = group_permissions + cls.parse_permissions_yaml_into_group_info()
|
||||
added_permissions = cls.add_permissions_from_group_permissions(group_permissions)
|
||||
cls.remove_old_permissions_from_added_permissions(
|
||||
added_permissions, initial_permission_assignments, initial_user_to_group_assignments
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue