ensure we do not delete user group assignments from regexes w/ burnettk (#690)

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2023-11-20 11:27:28 -05:00 committed by GitHub
parent f524b78368
commit 91ee554543
4 changed files with 56 additions and 30 deletions

View File

@ -2,6 +2,9 @@ from typing import TYPE_CHECKING
from typing import NewType
from typing import TypedDict
from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
from spiffworkflow_backend.models.user_group_assignment_waiting import UserGroupAssignmentWaitingModel
if TYPE_CHECKING:
from spiffworkflow_backend.models.process_group import ProcessGroup
@ -17,3 +20,26 @@ class ProcessGroupLite(TypedDict):
class ProcessGroupLitesWithCache(TypedDict):
cache: dict[str, "ProcessGroup"]
process_groups: list[ProcessGroupLite]
class UserToGroupDict(TypedDict):
username: str
group_identifier: str
class AddedPermissionDict(TypedDict):
group_identifiers: set[str]
permission_assignments: list[PermissionAssignmentModel]
user_to_group_identifiers: list[UserToGroupDict]
waiting_user_group_assignments: list[UserGroupAssignmentWaitingModel]
class DesiredGroupPermissionDict(TypedDict):
actions: list[str]
uri: str
class GroupPermissionsDict(TypedDict):
users: list[str]
name: str
permissions: list[DesiredGroupPermissionDict]

View File

@ -1,7 +1,6 @@
import inspect
import re
from dataclasses import dataclass
from typing import TypedDict
import yaml
from flask import current_app
@ -16,6 +15,9 @@ from spiffworkflow_backend.exceptions.error import PermissionsFileNotSetError
from spiffworkflow_backend.exceptions.error import UserDoesNotHaveAccessToTaskError
from spiffworkflow_backend.exceptions.error import UserNotLoggedInError
from spiffworkflow_backend.helpers.api_version import V1_API_PATH_PREFIX
from spiffworkflow_backend.interfaces import AddedPermissionDict
from spiffworkflow_backend.interfaces import GroupPermissionsDict
from spiffworkflow_backend.interfaces import UserToGroupDict
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.group import SPIFF_GUEST_GROUP
from spiffworkflow_backend.models.group import GroupModel
@ -75,29 +77,6 @@ PATH_SEGMENTS_FOR_PERMISSION_ALL = [
]
class UserToGroupDict(TypedDict):
username: str
group_identifier: str
class AddedPermissionDict(TypedDict):
group_identifiers: set[str]
permission_assignments: list[PermissionAssignmentModel]
user_to_group_identifiers: list[UserToGroupDict]
waiting_user_group_assignments: list[UserGroupAssignmentWaitingModel]
class DesiredGroupPermissionDict(TypedDict):
actions: list[str]
uri: str
class GroupPermissionsDict(TypedDict):
users: list[str]
name: str
permissions: list[DesiredGroupPermissionDict]
class AuthorizationService:
"""Determine whether a user has permission to perform their request."""
@ -675,6 +654,8 @@ class AuthorizationService:
* only works with PG and PM target macros
"""
permissions_to_assign: list[PermissionToAssign] = []
if "," in permission_set:
raise Exception(f"Found comma in permission_set. This should be an array instead: {permission_set}")
permissions = permission_set.split(",")
if permission_set == "all":
permissions = ["create", "read", "update", "delete"]
@ -815,9 +796,13 @@ class AuthorizationService:
"group_identifier": group_identifier,
}
user_to_group_identifiers.append(user_to_group_dict)
wugam = UserService.add_user_to_group_or_add_to_waiting(username, group_identifier)
(wugam, new_user_to_group_identifiers) = UserService.add_user_to_group_or_add_to_waiting(
username, group_identifier
)
if wugam is not None:
waiting_user_group_assignments.append(wugam)
if new_user_to_group_identifiers is not None:
user_to_group_identifiers = user_to_group_identifiers + new_user_to_group_identifiers
unique_user_group_identifiers.add(group_identifier)
for group in group_permissions:

View File

@ -4,6 +4,7 @@ from typing import Any
from flask import current_app
from flask import g
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.interfaces import UserToGroupDict
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.group import SPIFF_GUEST_GROUP
from spiffworkflow_backend.models.group import GroupModel
@ -126,7 +127,9 @@ class UserService:
db.session.commit()
@classmethod
def add_waiting_group_assignment(cls, username: str, group: GroupModel) -> UserGroupAssignmentWaitingModel:
def add_waiting_group_assignment(
cls, username: str, group: GroupModel
) -> tuple[UserGroupAssignmentWaitingModel, list[UserToGroupDict]]:
"""Only called from set-permissions."""
wugam: UserGroupAssignmentWaitingModel | None = (
UserGroupAssignmentWaitingModel().query.filter_by(username=username).filter_by(group_id=group.id).first()
@ -138,12 +141,14 @@ class UserService:
# backfill existing users
wildcard_pattern = wugam.pattern_from_wildcard_username()
user_to_group_identifiers: list[UserToGroupDict] = []
if wildcard_pattern is not None:
users = UserModel.query.filter(UserModel.username.regexp_match(wildcard_pattern)) # type: ignore
for user in users:
cls.add_user_to_group(user, group)
user_to_group_identifiers.append({"username": user.username, "group_identifier": group.identifier})
return wugam
return (wugam, user_to_group_identifiers)
@classmethod
def apply_waiting_group_assignments(cls, user: UserModel) -> None:
@ -236,14 +241,14 @@ class UserService:
@classmethod
def add_user_to_group_or_add_to_waiting(
cls, username: str | UserModel, group_identifier: str
) -> UserGroupAssignmentWaitingModel | None:
) -> tuple[UserGroupAssignmentWaitingModel | None, list[UserToGroupDict] | None]:
group = cls.find_or_create_group(group_identifier)
user = UserModel.query.filter_by(username=username).first()
if user:
cls.add_user_to_group(user, group)
else:
return cls.add_waiting_group_assignment(username, group)
return None
return (None, None)
@classmethod
def add_user_to_group_by_group_identifier(cls, user: UserModel, group_identifier: str) -> None:

View File

@ -557,9 +557,20 @@ class TestAuthorizationService(BaseTest):
waiting_assignments = UserGroupAssignmentWaitingModel.query.filter_by(username=user_regex).all()
assert len(waiting_assignments) == 1
assert waiting_assignments[0].username == user_regex
assert len(user.groups) == 2
assert "group_one" in [g.identifier for g in user.groups]
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey")
self.assert_user_has_permission(user_two, "read", "/v1.0/process-groups/hey", expected_result=False)
# run again to ensure the user does NOT lose permission when refreshing permissions again
AuthorizationService.refresh_permissions(group_info)
waiting_assignments = UserGroupAssignmentWaitingModel.query.filter_by(username=user_regex).all()
assert len(waiting_assignments) == 1
assert waiting_assignments[0].username == user_regex
assert len(user.groups) == 2
assert "group_one" in [g.identifier for g in user.groups]
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey")
user_three_dict = {
"username": "user_three",
"email": "user_three@example.com",
@ -621,4 +632,3 @@ class TestAuthorizationService(BaseTest):
# test it can be permitted again
AuthorizationService.add_permission_from_uri_or_macro(user_group.identifier, "read", "PG:hey:yo")
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo", expected_result=True)
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo:me", expected_result=True)