diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/authorization_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/authorization_service.py index 32efe6d8..1ff58394 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/authorization_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/authorization_service.py @@ -1,4 +1,3 @@ -"""Authorization_service.""" import inspect import re from dataclasses import dataclass @@ -40,25 +39,23 @@ from spiffworkflow_backend.services.user_service import UserService class PermissionsFileNotSetError(Exception): - """PermissionsFileNotSetError.""" + pass class HumanTaskNotFoundError(Exception): - """HumanTaskNotFoundError.""" + pass class UserDoesNotHaveAccessToTaskError(Exception): - """UserDoesNotHaveAccessToTaskError.""" + pass class InvalidPermissionError(Exception): - """InvalidPermissionError.""" + pass @dataclass class PermissionToAssign: - """PermissionToAssign.""" - permission: str target_uri: str @@ -91,7 +88,7 @@ class UserToGroupDict(TypedDict): group_identifier: str -class DesiredPermissionDict(TypedDict): +class AddedPermissionDict(TypedDict): group_identifiers: Set[str] permission_assignments: list[PermissionAssignmentModel] user_to_group_identifiers: list[UserToGroupDict] @@ -114,7 +111,6 @@ class AuthorizationService: # https://stackoverflow.com/a/71320673/6090676 @classmethod def verify_sha256_token(cls, auth_header: Optional[str]) -> None: - """Verify_sha256_token.""" if auth_header is None: raise TokenNotProvidedError( "unauthorized", @@ -130,7 +126,6 @@ class AuthorizationService: @classmethod def has_permission(cls, principals: list[PrincipalModel], permission: str, target_uri: str) -> bool: - """Has_permission.""" principal_ids = [p.id for p in principals] target_uri_normalized = target_uri.removeprefix(V1_API_PATH_PREFIX) @@ -160,7 +155,6 @@ class AuthorizationService: @classmethod def user_has_permission(cls, user: UserModel, permission: str, target_uri: str) -> bool: - """User_has_permission.""" if user.principal is None: raise MissingPrincipalError(f"Missing principal for user with id: {user.id}") @@ -186,7 +180,6 @@ class AuthorizationService: @classmethod def associate_user_with_group(cls, user: UserModel, group: GroupModel) -> None: - """Associate_user_with_group.""" user_group_assignemnt = UserGroupAssignmentModel.query.filter_by(user_id=user.id, group_id=group.id).first() if user_group_assignemnt is None: user_group_assignemnt = UserGroupAssignmentModel(user_id=user.id, group_id=group.id) @@ -194,7 +187,7 @@ class AuthorizationService: db.session.commit() @classmethod - def import_permissions_from_yaml_file(cls, user_model: Optional[UserModel] = None) -> DesiredPermissionDict: + def import_permissions_from_yaml_file(cls, user_model: Optional[UserModel] = None) -> AddedPermissionDict: group_permissions = cls.parse_permissions_yaml_into_group_info() result = cls.add_permissions_from_group_permissions(group_permissions, user_model) return result @@ -292,7 +285,6 @@ class AuthorizationService: @classmethod def check_for_permission(cls) -> None: - """Check_for_permission.""" if cls.should_disable_auth_for_request(): return None @@ -326,11 +318,6 @@ class AuthorizationService: @staticmethod def decode_auth_token(auth_token: str) -> dict[str, Union[str, None]]: - """Decode the auth token. - - :param auth_token: - :return: integer|string - """ secret_key = current_app.config.get("SECRET_KEY") if secret_key is None: raise KeyError("we need current_app.config to have a SECRET_KEY") @@ -374,10 +361,11 @@ class AuthorizationService: @classmethod def create_user_from_sign_in(cls, user_info: dict) -> UserModel: - """Create_user_from_sign_in.""" - """Name, family_name, given_name, middle_name, nickname, preferred_username,""" - """Profile, picture, website, gender, birthdate, zoneinfo, locale, and updated_at. """ - """Email.""" + """Fields from user_info. + + name, family_name, given_name, middle_name, nickname, preferred_username, + profile, picture, website, gender, birthdate, zoneinfo, locale,updated_at, email. + """ is_new_user = False user_attributes = {} @@ -450,7 +438,6 @@ class AuthorizationService: process_related_path_segment: str, target_uris: list[str], ) -> list[PermissionToAssign]: - """Get_permissions_to_assign.""" permissions = permission_set.split(",") if permission_set == "all": permissions = ["create", "read", "update", "delete"] @@ -500,7 +487,6 @@ class AuthorizationService: @classmethod def set_basic_permissions(cls) -> list[PermissionToAssign]: - """Set_basic_permissions.""" permissions_to_assign: list[PermissionToAssign] = [] permissions_to_assign.append(PermissionToAssign(permission="create", target_uri="/process-instances/for-me")) permissions_to_assign.append( @@ -528,7 +514,6 @@ class AuthorizationService: @classmethod def set_process_group_permissions(cls, target: str, permission_set: str) -> list[PermissionToAssign]: - """Set_process_group_permissions.""" permissions_to_assign: list[PermissionToAssign] = [] process_group_identifier = target.removeprefix("PG:").replace("/", ":").removeprefix(":") process_related_path_segment = f"{process_group_identifier}:*" @@ -545,7 +530,6 @@ class AuthorizationService: @classmethod def set_process_model_permissions(cls, target: str, permission_set: str) -> list[PermissionToAssign]: - """Set_process_model_permissions.""" permissions_to_assign: list[PermissionToAssign] = [] process_model_identifier = target.removeprefix("PM:").replace("/", ":").removeprefix(":") process_related_path_segment = f"{process_model_identifier}/*" @@ -668,7 +652,7 @@ class AuthorizationService: @classmethod def add_permissions_from_group_permissions( cls, group_permissions: list[GroupPermissionsDict], user_model: Optional[UserModel] = None - ) -> DesiredPermissionDict: + ) -> AddedPermissionDict: unique_user_group_identifiers: Set[str] = set() user_to_group_identifiers: list[UserToGroupDict] = [] permission_assignments = [] @@ -715,19 +699,18 @@ class AuthorizationService: } @classmethod - def refresh_permissions(cls, group_permissions: list[GroupPermissionsDict]) -> None: - """Adds new permission assignments and deletes old ones.""" - initial_permission_assignments = PermissionAssignmentModel.query.all() - initial_user_to_group_assignments = UserGroupAssignmentModel.query.all() - group_permissions = group_permissions + cls.parse_permissions_yaml_into_group_info() - - result = cls.add_permissions_from_group_permissions(group_permissions) - desired_permission_assignments = result["permission_assignments"] - desired_group_identifiers = result["group_identifiers"] - desired_user_to_group_identifiers = result["user_to_group_identifiers"] + def remove_old_permissions_from_added_permissions( + cls, + added_permissions: AddedPermissionDict, + initial_permission_assignments: list[PermissionAssignmentModel], + initial_user_to_group_assignments: list[UserGroupAssignmentModel], + ) -> None: + added_permission_assignments = added_permissions["permission_assignments"] + added_group_identifiers = added_permissions["group_identifiers"] + added_user_to_group_identifiers = added_permissions["user_to_group_identifiers"] for ipa in initial_permission_assignments: - if ipa not in desired_permission_assignments: + if ipa not in added_permission_assignments: db.session.delete(ipa) for iutga in initial_user_to_group_assignments: @@ -740,12 +723,23 @@ class AuthorizationService: "username": iutga.user.username, "group_identifier": iutga.group.identifier, } - if current_user_dict not in desired_user_to_group_identifiers: + if current_user_dict not in added_user_to_group_identifiers: db.session.delete(iutga) # do not remove the default user group - desired_group_identifiers.add(current_app.config["SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP"]) - groups_to_delete = GroupModel.query.filter(GroupModel.identifier.not_in(desired_group_identifiers)).all() + added_group_identifiers.add(current_app.config["SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP"]) + groups_to_delete = GroupModel.query.filter(GroupModel.identifier.not_in(added_group_identifiers)).all() for gtd in groups_to_delete: db.session.delete(gtd) db.session.commit() + + @classmethod + def refresh_permissions(cls, group_permissions: list[GroupPermissionsDict]) -> None: + """Adds new permission assignments and deletes old ones.""" + initial_permission_assignments = PermissionAssignmentModel.query.all() + initial_user_to_group_assignments = UserGroupAssignmentModel.query.all() + group_permissions = group_permissions + cls.parse_permissions_yaml_into_group_info() + added_permissions = cls.add_permissions_from_group_permissions(group_permissions) + cls.remove_old_permissions_from_added_permissions( + added_permissions, initial_permission_assignments, initial_user_to_group_assignments + )