Merge pull request #244 from sartography/feature/check_permissions_on_group_and_model_list

Feature/check permissions on group and model list
This commit is contained in:
jasquat 2023-05-08 14:29:39 -04:00 committed by GitHub
commit 5de162f789
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 156 additions and 29 deletions

View File

@ -87,11 +87,7 @@ def process_group_update(modified_process_group_id: str, body: dict) -> flask.wr
def process_group_list(
process_group_identifier: Optional[str] = None, page: int = 1, per_page: int = 100
) -> flask.wrappers.Response:
"""Process_group_list."""
if process_group_identifier is not None:
process_groups = ProcessModelService.get_process_groups(process_group_identifier)
else:
process_groups = ProcessModelService.get_process_groups()
process_groups = ProcessModelService.get_process_groups_for_api(process_group_identifier)
batch = ProcessModelService().get_batch(items=process_groups, page=page, per_page=per_page)
pages = len(process_groups) // per_page
remainder = len(process_groups) % per_page

View File

@ -214,8 +214,7 @@ def process_model_list(
page: int = 1,
per_page: int = 100,
) -> flask.wrappers.Response:
"""Process model list!"""
process_models = ProcessModelService.get_process_models(
process_models = ProcessModelService.get_process_models_for_api(
process_group_id=process_group_identifier,
recursive=recursive,
filter_runnable_by_user=filter_runnable_by_user,

View File

@ -577,6 +577,8 @@ class AuthorizationService:
permissions_to_assign.append(
PermissionToAssign(permission="read", target_uri="/process-instances/report-metadata")
)
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/process-groups"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/process-models"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/processes"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/processes/callers"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/service-tasks"))

View File

@ -2,6 +2,7 @@
import json
import os
import shutil
import uuid
from glob import glob
from typing import Any
from typing import List
@ -187,9 +188,7 @@ class ProcessModelService(FileSystemService):
cls,
process_group_id: Optional[str] = None,
recursive: Optional[bool] = False,
filter_runnable_by_user: Optional[bool] = False,
) -> List[ProcessModelInfo]:
"""Get process models."""
process_models = []
root_path = FileSystemService.root_path()
if process_group_id:
@ -205,22 +204,45 @@ class ProcessModelService(FileSystemService):
process_model = cls.get_process_model_from_relative_path(os.path.dirname(process_model_relative_path))
process_models.append(process_model)
process_models.sort()
if filter_runnable_by_user:
user = UserService.current_user()
new_process_model_list = []
for process_model in process_models:
modified_process_model_id = ProcessModelInfo.modify_process_identifier_for_path_param(process_model.id)
uri = f"/v1.0/process-instances/{modified_process_model_id}"
has_permission = AuthorizationService.user_has_permission(
user=user, permission="create", target_uri=uri
)
if has_permission:
new_process_model_list.append(process_model)
return new_process_model_list
return process_models
@classmethod
def get_process_models_for_api(
cls,
process_group_id: Optional[str] = None,
recursive: Optional[bool] = False,
filter_runnable_by_user: Optional[bool] = False,
) -> List[ProcessModelInfo]:
process_models = cls.get_process_models(process_group_id, recursive)
permission_to_check = "read"
permission_base_uri = "/v1.0/process-models"
user = UserService.current_user()
if filter_runnable_by_user:
permission_to_check = "create"
permission_base_uri = "/v1.0/process-instances"
# if user has access to uri/* with that permission then there's no reason to check each one individually
guid_of_non_existent_item_to_check_perms_against = str(uuid.uuid4())
has_permission = AuthorizationService.user_has_permission(
user=user,
permission=permission_to_check,
target_uri=f"{permission_base_uri}/{guid_of_non_existent_item_to_check_perms_against}",
)
if has_permission:
return process_models
new_process_model_list = []
for process_model in process_models:
modified_process_model_id = ProcessModelInfo.modify_process_identifier_for_path_param(process_model.id)
uri = f"{permission_base_uri}/{modified_process_model_id}"
has_permission = AuthorizationService.user_has_permission(
user=user, permission=permission_to_check, target_uri=uri
)
if has_permission:
new_process_model_list.append(process_model)
return new_process_model_list
@classmethod
def get_parent_group_array_and_cache_it(
cls, process_identifier: str, process_group_cache: dict[str, ProcessGroup]
@ -256,6 +278,38 @@ class ProcessModelService(FileSystemService):
process_groups.sort()
return process_groups
@classmethod
def get_process_groups_for_api(
cls,
process_group_id: Optional[str] = None,
) -> List[ProcessGroup]:
process_groups = cls.get_process_groups(process_group_id)
permission_to_check = "read"
permission_base_uri = "/v1.0/process-groups"
user = UserService.current_user()
# if user has access to uri/* with that permission then there's no reason to check each one individually
guid_of_non_existent_item_to_check_perms_against = str(uuid.uuid4())
has_permission = AuthorizationService.user_has_permission(
user=user,
permission=permission_to_check,
target_uri=f"{permission_base_uri}/{guid_of_non_existent_item_to_check_perms_against}",
)
if has_permission:
return process_groups
new_process_group_list = []
for process_group in process_groups:
modified_process_group_id = ProcessModelInfo.modify_process_identifier_for_path_param(process_group.id)
uri = f"{permission_base_uri}/{modified_process_group_id}"
has_permission = AuthorizationService.user_has_permission(
user=user, permission=permission_to_check, target_uri=uri
)
if has_permission:
new_process_group_list.append(process_group)
return new_process_group_list
@classmethod
def get_process_group(cls, process_group_id: str, find_direct_nested_items: bool = True) -> ProcessGroup:
"""Look for a given process_group, and return it."""

View File

@ -702,7 +702,6 @@ class TestProcessApi(BaseTest):
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_process_group_list."""
# add 5 groups
for i in range(5):
group_id = f"test_process_group_{i}"
@ -997,14 +996,13 @@ class TestProcessApi(BaseTest):
assert response.json is not None
assert "test_group/random_fact" == response.json["process_model_identifier"]
def test_get_process_groups_when_none(
def test_process_group_list_when_none(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_get_process_groups_when_none."""
response = client.get(
"/v1.0/process-groups",
headers=self.logged_in_headers(with_super_admin_user),
@ -1013,14 +1011,13 @@ class TestProcessApi(BaseTest):
assert response.json is not None
assert response.json["results"] == []
def test_get_process_groups_when_there_are_some(
def test_process_group_list_when_there_are_some(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_get_process_groups_when_there_are_some."""
self.create_group_and_model_with_bpmn(client, with_super_admin_user)
response = client.get(
"/v1.0/process-groups",
@ -1033,6 +1030,84 @@ class TestProcessApi(BaseTest):
assert response.json["pagination"]["total"] == 1
assert response.json["pagination"]["pages"] == 1
def test_process_group_list_when_user_has_resticted_access(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_group_and_model_with_bpmn(
client, with_super_admin_user, process_group_id="admin_only", process_model_id="random_fact"
)
self.create_group_and_model_with_bpmn(
client, with_super_admin_user, process_group_id="all_users", process_model_id="hello_world"
)
user_one = self.create_user_with_permission(username="user_one", target_uri="/v1.0/process-groups/all_users:*")
self.add_permissions_to_user(user=user_one, target_uri="/v1.0/process-groups", permission_names=["read"])
response = client.get(
"/v1.0/process-groups",
headers=self.logged_in_headers(with_super_admin_user),
)
assert response.status_code == 200
assert response.json is not None
assert len(response.json["results"]) == 2
assert response.json["pagination"]["count"] == 2
assert response.json["pagination"]["total"] == 2
assert response.json["pagination"]["pages"] == 1
response = client.get(
"/v1.0/process-groups",
headers=self.logged_in_headers(user_one),
)
assert response.status_code == 200
assert response.json is not None
assert len(response.json["results"]) == 1
assert response.json["results"][0]["id"] == "all_users"
assert response.json["pagination"]["count"] == 1
assert response.json["pagination"]["total"] == 1
assert response.json["pagination"]["pages"] == 1
def test_process_model_list_when_user_has_resticted_access(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_group_and_model_with_bpmn(
client, with_super_admin_user, process_group_id="admin_only", process_model_id="random_fact"
)
self.create_group_and_model_with_bpmn(
client, with_super_admin_user, process_group_id="all_users", process_model_id="hello_world"
)
user_one = self.create_user_with_permission(username="user_one", target_uri="/v1.0/process-models/all_users:*")
self.add_permissions_to_user(user=user_one, target_uri="/v1.0/process-models", permission_names=["read"])
response = client.get(
"/v1.0/process-models?recursive=true",
headers=self.logged_in_headers(with_super_admin_user),
)
assert response.status_code == 200
assert response.json is not None
assert len(response.json["results"]) == 2
assert response.json["pagination"]["count"] == 2
assert response.json["pagination"]["total"] == 2
assert response.json["pagination"]["pages"] == 1
response = client.get(
"/v1.0/process-models?recursive=true",
headers=self.logged_in_headers(user_one),
)
assert response.status_code == 200
assert response.json is not None
assert len(response.json["results"]) == 1
assert response.json["results"][0]["id"] == "all_users/hello_world"
assert response.json["pagination"]["count"] == 1
assert response.json["pagination"]["total"] == 1
assert response.json["pagination"]["pages"] == 1
def test_get_process_group_when_found(
self,
app: Flask,

View File

@ -287,9 +287,9 @@ class TestAuthorizationService(BaseTest):
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_explode_permissions_basic."""
expected_permissions = [
("/active-users/*", "read"),
("/process-groups", "read"),
("/process-instances/find-by-id/*", "read"),
("/process-instances/for-me", "create"),
("/process-instances/report-metadata", "read"),
@ -297,6 +297,7 @@ class TestAuthorizationService(BaseTest):
("/process-instances/reports/*", "delete"),
("/process-instances/reports/*", "read"),
("/process-instances/reports/*", "update"),
("/process-models", "read"),
("/processes", "read"),
("/processes/callers", "read"),
("/service-tasks", "read"),