check permissions on process group and model list api endpoints w/ burnettk

This commit is contained in:
jasquat 2023-05-05 16:14:22 -04:00
parent 950d07f306
commit 1b97cbb5b2
6 changed files with 55 additions and 25 deletions

View File

@ -22,4 +22,3 @@ permissions:
users: []
allowed_permissions: [create, read, update, delete]
uri: /*

View File

@ -87,11 +87,7 @@ def process_group_update(modified_process_group_id: str, body: dict) -> flask.wr
def process_group_list(
process_group_identifier: Optional[str] = None, page: int = 1, per_page: int = 100
) -> flask.wrappers.Response:
"""Process_group_list."""
if process_group_identifier is not None:
process_groups = ProcessModelService.get_process_groups(process_group_identifier)
else:
process_groups = ProcessModelService.get_process_groups()
process_groups = ProcessModelService.get_process_groups_for_api(process_group_identifier)
batch = ProcessModelService().get_batch(items=process_groups, page=page, per_page=per_page)
pages = len(process_groups) // per_page
remainder = len(process_groups) % per_page

View File

@ -214,8 +214,7 @@ def process_model_list(
page: int = 1,
per_page: int = 100,
) -> flask.wrappers.Response:
"""Process model list!"""
process_models = ProcessModelService.get_process_models(
process_models = ProcessModelService.get_process_models_for_api(
process_group_id=process_group_identifier,
recursive=recursive,
filter_runnable_by_user=filter_runnable_by_user,

View File

@ -577,6 +577,8 @@ class AuthorizationService:
permissions_to_assign.append(
PermissionToAssign(permission="read", target_uri="/process-instances/report-metadata")
)
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/process-groups"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/process-models"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/processes"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/processes/callers"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/service-tasks"))

View File

@ -187,9 +187,7 @@ class ProcessModelService(FileSystemService):
cls,
process_group_id: Optional[str] = None,
recursive: Optional[bool] = False,
filter_runnable_by_user: Optional[bool] = False,
) -> List[ProcessModelInfo]:
"""Get process models."""
process_models = []
root_path = FileSystemService.root_path()
if process_group_id:
@ -205,22 +203,35 @@ class ProcessModelService(FileSystemService):
process_model = cls.get_process_model_from_relative_path(os.path.dirname(process_model_relative_path))
process_models.append(process_model)
process_models.sort()
if filter_runnable_by_user:
user = UserService.current_user()
new_process_model_list = []
for process_model in process_models:
modified_process_model_id = ProcessModelInfo.modify_process_identifier_for_path_param(process_model.id)
uri = f"/v1.0/process-instances/{modified_process_model_id}"
has_permission = AuthorizationService.user_has_permission(
user=user, permission="create", target_uri=uri
)
if has_permission:
new_process_model_list.append(process_model)
return new_process_model_list
return process_models
@classmethod
def get_process_models_for_api(
cls,
process_group_id: Optional[str] = None,
recursive: Optional[bool] = False,
filter_runnable_by_user: Optional[bool] = False,
) -> List[ProcessModelInfo]:
process_models = cls.get_process_models(process_group_id, recursive)
permission_to_check = "read"
permission_base_uri = "/v1.0/process-models"
if filter_runnable_by_user:
permission_to_check = "create"
permission_base_uri = "/v1.0/process-instances"
user = UserService.current_user()
new_process_model_list = []
for process_model in process_models:
modified_process_model_id = ProcessModelInfo.modify_process_identifier_for_path_param(process_model.id)
uri = f"{permission_base_uri}/{modified_process_model_id}"
has_permission = AuthorizationService.user_has_permission(
user=user, permission=permission_to_check, target_uri=uri
)
if has_permission:
new_process_model_list.append(process_model)
return new_process_model_list
@classmethod
def get_parent_group_array_and_cache_it(
cls, process_identifier: str, process_group_cache: dict[str, ProcessGroup]
@ -256,6 +267,28 @@ class ProcessModelService(FileSystemService):
process_groups.sort()
return process_groups
@classmethod
def get_process_groups_for_api(
cls,
process_group_id: Optional[str] = None,
) -> List[ProcessGroup]:
process_groups = cls.get_process_groups(process_group_id)
permission_to_check = "read"
permission_base_uri = "/v1.0/process-groups"
user = UserService.current_user()
new_process_group_list = []
for process_group in process_groups:
modified_process_group_id = ProcessModelInfo.modify_process_identifier_for_path_param(process_group.id)
uri = f"{permission_base_uri}/{modified_process_group_id}"
has_permission = AuthorizationService.user_has_permission(
user=user, permission=permission_to_check, target_uri=uri
)
if has_permission:
new_process_group_list.append(process_group)
return new_process_group_list
@classmethod
def get_process_group(cls, process_group_id: str, find_direct_nested_items: bool = True) -> ProcessGroup:
"""Look for a given process_group, and return it."""

View File

@ -287,9 +287,9 @@ class TestAuthorizationService(BaseTest):
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_explode_permissions_basic."""
expected_permissions = [
("/active-users/*", "read"),
("/process-groups", "read"),
("/process-instances/find-by-id/*", "read"),
("/process-instances/for-me", "create"),
("/process-instances/report-metadata", "read"),
@ -297,6 +297,7 @@ class TestAuthorizationService(BaseTest):
("/process-instances/reports/*", "delete"),
("/process-instances/reports/*", "read"),
("/process-instances/reports/*", "update"),
("/process-models", "read"),
("/processes", "read"),
("/processes/callers", "read"),
("/service-tasks", "read"),