moved remove permission code to own method and some cleanup

This commit is contained in:
jasquat 2023-05-18 12:35:23 -04:00
parent e6d430853b
commit 0c092c05dd
No known key found for this signature in database
1 changed files with 36 additions and 42 deletions

View File

@ -1,4 +1,3 @@
"""Authorization_service."""
import inspect
import re
from dataclasses import dataclass
@ -40,25 +39,23 @@ from spiffworkflow_backend.services.user_service import UserService
class PermissionsFileNotSetError(Exception):
"""PermissionsFileNotSetError."""
pass
class HumanTaskNotFoundError(Exception):
"""HumanTaskNotFoundError."""
pass
class UserDoesNotHaveAccessToTaskError(Exception):
"""UserDoesNotHaveAccessToTaskError."""
pass
class InvalidPermissionError(Exception):
"""InvalidPermissionError."""
pass
@dataclass
class PermissionToAssign:
"""PermissionToAssign."""
permission: str
target_uri: str
@ -91,7 +88,7 @@ class UserToGroupDict(TypedDict):
group_identifier: str
class DesiredPermissionDict(TypedDict):
class AddedPermissionDict(TypedDict):
group_identifiers: Set[str]
permission_assignments: list[PermissionAssignmentModel]
user_to_group_identifiers: list[UserToGroupDict]
@ -114,7 +111,6 @@ class AuthorizationService:
# https://stackoverflow.com/a/71320673/6090676
@classmethod
def verify_sha256_token(cls, auth_header: Optional[str]) -> None:
"""Verify_sha256_token."""
if auth_header is None:
raise TokenNotProvidedError(
"unauthorized",
@ -130,7 +126,6 @@ class AuthorizationService:
@classmethod
def has_permission(cls, principals: list[PrincipalModel], permission: str, target_uri: str) -> bool:
"""Has_permission."""
principal_ids = [p.id for p in principals]
target_uri_normalized = target_uri.removeprefix(V1_API_PATH_PREFIX)
@ -160,7 +155,6 @@ class AuthorizationService:
@classmethod
def user_has_permission(cls, user: UserModel, permission: str, target_uri: str) -> bool:
"""User_has_permission."""
if user.principal is None:
raise MissingPrincipalError(f"Missing principal for user with id: {user.id}")
@ -186,7 +180,6 @@ class AuthorizationService:
@classmethod
def associate_user_with_group(cls, user: UserModel, group: GroupModel) -> None:
"""Associate_user_with_group."""
user_group_assignemnt = UserGroupAssignmentModel.query.filter_by(user_id=user.id, group_id=group.id).first()
if user_group_assignemnt is None:
user_group_assignemnt = UserGroupAssignmentModel(user_id=user.id, group_id=group.id)
@ -194,7 +187,7 @@ class AuthorizationService:
db.session.commit()
@classmethod
def import_permissions_from_yaml_file(cls, user_model: Optional[UserModel] = None) -> DesiredPermissionDict:
def import_permissions_from_yaml_file(cls, user_model: Optional[UserModel] = None) -> AddedPermissionDict:
group_permissions = cls.parse_permissions_yaml_into_group_info()
result = cls.add_permissions_from_group_permissions(group_permissions, user_model)
return result
@ -292,7 +285,6 @@ class AuthorizationService:
@classmethod
def check_for_permission(cls) -> None:
"""Check_for_permission."""
if cls.should_disable_auth_for_request():
return None
@ -326,11 +318,6 @@ class AuthorizationService:
@staticmethod
def decode_auth_token(auth_token: str) -> dict[str, Union[str, None]]:
"""Decode the auth token.
:param auth_token:
:return: integer|string
"""
secret_key = current_app.config.get("SECRET_KEY")
if secret_key is None:
raise KeyError("we need current_app.config to have a SECRET_KEY")
@ -374,10 +361,11 @@ class AuthorizationService:
@classmethod
def create_user_from_sign_in(cls, user_info: dict) -> UserModel:
"""Create_user_from_sign_in."""
"""Name, family_name, given_name, middle_name, nickname, preferred_username,"""
"""Profile, picture, website, gender, birthdate, zoneinfo, locale, and updated_at. """
"""Email."""
"""Fields from user_info.
name, family_name, given_name, middle_name, nickname, preferred_username,
profile, picture, website, gender, birthdate, zoneinfo, locale,updated_at, email.
"""
is_new_user = False
user_attributes = {}
@ -450,7 +438,6 @@ class AuthorizationService:
process_related_path_segment: str,
target_uris: list[str],
) -> list[PermissionToAssign]:
"""Get_permissions_to_assign."""
permissions = permission_set.split(",")
if permission_set == "all":
permissions = ["create", "read", "update", "delete"]
@ -500,7 +487,6 @@ class AuthorizationService:
@classmethod
def set_basic_permissions(cls) -> list[PermissionToAssign]:
"""Set_basic_permissions."""
permissions_to_assign: list[PermissionToAssign] = []
permissions_to_assign.append(PermissionToAssign(permission="create", target_uri="/process-instances/for-me"))
permissions_to_assign.append(
@ -528,7 +514,6 @@ class AuthorizationService:
@classmethod
def set_process_group_permissions(cls, target: str, permission_set: str) -> list[PermissionToAssign]:
"""Set_process_group_permissions."""
permissions_to_assign: list[PermissionToAssign] = []
process_group_identifier = target.removeprefix("PG:").replace("/", ":").removeprefix(":")
process_related_path_segment = f"{process_group_identifier}:*"
@ -545,7 +530,6 @@ class AuthorizationService:
@classmethod
def set_process_model_permissions(cls, target: str, permission_set: str) -> list[PermissionToAssign]:
"""Set_process_model_permissions."""
permissions_to_assign: list[PermissionToAssign] = []
process_model_identifier = target.removeprefix("PM:").replace("/", ":").removeprefix(":")
process_related_path_segment = f"{process_model_identifier}/*"
@ -668,7 +652,7 @@ class AuthorizationService:
@classmethod
def add_permissions_from_group_permissions(
cls, group_permissions: list[GroupPermissionsDict], user_model: Optional[UserModel] = None
) -> DesiredPermissionDict:
) -> AddedPermissionDict:
unique_user_group_identifiers: Set[str] = set()
user_to_group_identifiers: list[UserToGroupDict] = []
permission_assignments = []
@ -715,19 +699,18 @@ class AuthorizationService:
}
@classmethod
def refresh_permissions(cls, group_permissions: list[GroupPermissionsDict]) -> None:
"""Adds new permission assignments and deletes old ones."""
initial_permission_assignments = PermissionAssignmentModel.query.all()
initial_user_to_group_assignments = UserGroupAssignmentModel.query.all()
group_permissions = group_permissions + cls.parse_permissions_yaml_into_group_info()
result = cls.add_permissions_from_group_permissions(group_permissions)
desired_permission_assignments = result["permission_assignments"]
desired_group_identifiers = result["group_identifiers"]
desired_user_to_group_identifiers = result["user_to_group_identifiers"]
def remove_old_permissions_from_added_permissions(
cls,
added_permissions: AddedPermissionDict,
initial_permission_assignments: list[PermissionAssignmentModel],
initial_user_to_group_assignments: list[UserGroupAssignmentModel],
) -> None:
added_permission_assignments = added_permissions["permission_assignments"]
added_group_identifiers = added_permissions["group_identifiers"]
added_user_to_group_identifiers = added_permissions["user_to_group_identifiers"]
for ipa in initial_permission_assignments:
if ipa not in desired_permission_assignments:
if ipa not in added_permission_assignments:
db.session.delete(ipa)
for iutga in initial_user_to_group_assignments:
@ -740,12 +723,23 @@ class AuthorizationService:
"username": iutga.user.username,
"group_identifier": iutga.group.identifier,
}
if current_user_dict not in desired_user_to_group_identifiers:
if current_user_dict not in added_user_to_group_identifiers:
db.session.delete(iutga)
# do not remove the default user group
desired_group_identifiers.add(current_app.config["SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP"])
groups_to_delete = GroupModel.query.filter(GroupModel.identifier.not_in(desired_group_identifiers)).all()
added_group_identifiers.add(current_app.config["SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP"])
groups_to_delete = GroupModel.query.filter(GroupModel.identifier.not_in(added_group_identifiers)).all()
for gtd in groups_to_delete:
db.session.delete(gtd)
db.session.commit()
@classmethod
def refresh_permissions(cls, group_permissions: list[GroupPermissionsDict]) -> None:
"""Adds new permission assignments and deletes old ones."""
initial_permission_assignments = PermissionAssignmentModel.query.all()
initial_user_to_group_assignments = UserGroupAssignmentModel.query.all()
group_permissions = group_permissions + cls.parse_permissions_yaml_into_group_info()
added_permissions = cls.add_permissions_from_group_permissions(group_permissions)
cls.remove_old_permissions_from_added_permissions(
added_permissions, initial_permission_assignments, initial_user_to_group_assignments
)