Merge pull request #244 from sartography/feature/check_permissions_on_group_and_model_list

Feature/check permissions on group and model list
This commit is contained in:
jasquat 2023-05-08 14:29:39 -04:00 committed by GitHub
commit 26310da412
6 changed files with 156 additions and 29 deletions

View File

@ -87,11 +87,7 @@ def process_group_update(modified_process_group_id: str, body: dict) -> flask.wr
def process_group_list( def process_group_list(
process_group_identifier: Optional[str] = None, page: int = 1, per_page: int = 100 process_group_identifier: Optional[str] = None, page: int = 1, per_page: int = 100
) -> flask.wrappers.Response: ) -> flask.wrappers.Response:
"""Process_group_list.""" process_groups = ProcessModelService.get_process_groups_for_api(process_group_identifier)
if process_group_identifier is not None:
process_groups = ProcessModelService.get_process_groups(process_group_identifier)
else:
process_groups = ProcessModelService.get_process_groups()
batch = ProcessModelService().get_batch(items=process_groups, page=page, per_page=per_page) batch = ProcessModelService().get_batch(items=process_groups, page=page, per_page=per_page)
pages = len(process_groups) // per_page pages = len(process_groups) // per_page
remainder = len(process_groups) % per_page remainder = len(process_groups) % per_page

View File

@ -214,8 +214,7 @@ def process_model_list(
page: int = 1, page: int = 1,
per_page: int = 100, per_page: int = 100,
) -> flask.wrappers.Response: ) -> flask.wrappers.Response:
"""Process model list!""" process_models = ProcessModelService.get_process_models_for_api(
process_models = ProcessModelService.get_process_models(
process_group_id=process_group_identifier, process_group_id=process_group_identifier,
recursive=recursive, recursive=recursive,
filter_runnable_by_user=filter_runnable_by_user, filter_runnable_by_user=filter_runnable_by_user,

View File

@ -577,6 +577,8 @@ class AuthorizationService:
permissions_to_assign.append( permissions_to_assign.append(
PermissionToAssign(permission="read", target_uri="/process-instances/report-metadata") PermissionToAssign(permission="read", target_uri="/process-instances/report-metadata")
) )
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/process-groups"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/process-models"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/processes")) permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/processes"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/processes/callers")) permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/processes/callers"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/service-tasks")) permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/service-tasks"))

View File

@ -2,6 +2,7 @@
import json import json
import os import os
import shutil import shutil
import uuid
from glob import glob from glob import glob
from typing import Any from typing import Any
from typing import List from typing import List
@ -187,9 +188,7 @@ class ProcessModelService(FileSystemService):
cls, cls,
process_group_id: Optional[str] = None, process_group_id: Optional[str] = None,
recursive: Optional[bool] = False, recursive: Optional[bool] = False,
filter_runnable_by_user: Optional[bool] = False,
) -> List[ProcessModelInfo]: ) -> List[ProcessModelInfo]:
"""Get process models."""
process_models = [] process_models = []
root_path = FileSystemService.root_path() root_path = FileSystemService.root_path()
if process_group_id: if process_group_id:
@ -205,22 +204,45 @@ class ProcessModelService(FileSystemService):
process_model = cls.get_process_model_from_relative_path(os.path.dirname(process_model_relative_path)) process_model = cls.get_process_model_from_relative_path(os.path.dirname(process_model_relative_path))
process_models.append(process_model) process_models.append(process_model)
process_models.sort() process_models.sort()
if filter_runnable_by_user:
user = UserService.current_user()
new_process_model_list = []
for process_model in process_models:
modified_process_model_id = ProcessModelInfo.modify_process_identifier_for_path_param(process_model.id)
uri = f"/v1.0/process-instances/{modified_process_model_id}"
has_permission = AuthorizationService.user_has_permission(
user=user, permission="create", target_uri=uri
)
if has_permission:
new_process_model_list.append(process_model)
return new_process_model_list
return process_models return process_models
@classmethod
def get_process_models_for_api(
cls,
process_group_id: Optional[str] = None,
recursive: Optional[bool] = False,
filter_runnable_by_user: Optional[bool] = False,
) -> List[ProcessModelInfo]:
process_models = cls.get_process_models(process_group_id, recursive)
permission_to_check = "read"
permission_base_uri = "/v1.0/process-models"
user = UserService.current_user()
if filter_runnable_by_user:
permission_to_check = "create"
permission_base_uri = "/v1.0/process-instances"
# if user has access to uri/* with that permission then there's no reason to check each one individually
guid_of_non_existent_item_to_check_perms_against = str(uuid.uuid4())
has_permission = AuthorizationService.user_has_permission(
user=user,
permission=permission_to_check,
target_uri=f"{permission_base_uri}/{guid_of_non_existent_item_to_check_perms_against}",
)
if has_permission:
return process_models
new_process_model_list = []
for process_model in process_models:
modified_process_model_id = ProcessModelInfo.modify_process_identifier_for_path_param(process_model.id)
uri = f"{permission_base_uri}/{modified_process_model_id}"
has_permission = AuthorizationService.user_has_permission(
user=user, permission=permission_to_check, target_uri=uri
)
if has_permission:
new_process_model_list.append(process_model)
return new_process_model_list
@classmethod @classmethod
def get_parent_group_array_and_cache_it( def get_parent_group_array_and_cache_it(
cls, process_identifier: str, process_group_cache: dict[str, ProcessGroup] cls, process_identifier: str, process_group_cache: dict[str, ProcessGroup]
@ -256,6 +278,38 @@ class ProcessModelService(FileSystemService):
process_groups.sort() process_groups.sort()
return process_groups return process_groups
@classmethod
def get_process_groups_for_api(
cls,
process_group_id: Optional[str] = None,
) -> List[ProcessGroup]:
process_groups = cls.get_process_groups(process_group_id)
permission_to_check = "read"
permission_base_uri = "/v1.0/process-groups"
user = UserService.current_user()
# if user has access to uri/* with that permission then there's no reason to check each one individually
guid_of_non_existent_item_to_check_perms_against = str(uuid.uuid4())
has_permission = AuthorizationService.user_has_permission(
user=user,
permission=permission_to_check,
target_uri=f"{permission_base_uri}/{guid_of_non_existent_item_to_check_perms_against}",
)
if has_permission:
return process_groups
new_process_group_list = []
for process_group in process_groups:
modified_process_group_id = ProcessModelInfo.modify_process_identifier_for_path_param(process_group.id)
uri = f"{permission_base_uri}/{modified_process_group_id}"
has_permission = AuthorizationService.user_has_permission(
user=user, permission=permission_to_check, target_uri=uri
)
if has_permission:
new_process_group_list.append(process_group)
return new_process_group_list
@classmethod @classmethod
def get_process_group(cls, process_group_id: str, find_direct_nested_items: bool = True) -> ProcessGroup: def get_process_group(cls, process_group_id: str, find_direct_nested_items: bool = True) -> ProcessGroup:
"""Look for a given process_group, and return it.""" """Look for a given process_group, and return it."""

View File

@ -702,7 +702,6 @@ class TestProcessApi(BaseTest):
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel, with_super_admin_user: UserModel,
) -> None: ) -> None:
"""Test_process_group_list."""
# add 5 groups # add 5 groups
for i in range(5): for i in range(5):
group_id = f"test_process_group_{i}" group_id = f"test_process_group_{i}"
@ -997,14 +996,13 @@ class TestProcessApi(BaseTest):
assert response.json is not None assert response.json is not None
assert "test_group/random_fact" == response.json["process_model_identifier"] assert "test_group/random_fact" == response.json["process_model_identifier"]
def test_get_process_groups_when_none( def test_process_group_list_when_none(
self, self,
app: Flask, app: Flask,
client: FlaskClient, client: FlaskClient,
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel, with_super_admin_user: UserModel,
) -> None: ) -> None:
"""Test_get_process_groups_when_none."""
response = client.get( response = client.get(
"/v1.0/process-groups", "/v1.0/process-groups",
headers=self.logged_in_headers(with_super_admin_user), headers=self.logged_in_headers(with_super_admin_user),
@ -1013,14 +1011,13 @@ class TestProcessApi(BaseTest):
assert response.json is not None assert response.json is not None
assert response.json["results"] == [] assert response.json["results"] == []
def test_get_process_groups_when_there_are_some( def test_process_group_list_when_there_are_some(
self, self,
app: Flask, app: Flask,
client: FlaskClient, client: FlaskClient,
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel, with_super_admin_user: UserModel,
) -> None: ) -> None:
"""Test_get_process_groups_when_there_are_some."""
self.create_group_and_model_with_bpmn(client, with_super_admin_user) self.create_group_and_model_with_bpmn(client, with_super_admin_user)
response = client.get( response = client.get(
"/v1.0/process-groups", "/v1.0/process-groups",
@ -1033,6 +1030,84 @@ class TestProcessApi(BaseTest):
assert response.json["pagination"]["total"] == 1 assert response.json["pagination"]["total"] == 1
assert response.json["pagination"]["pages"] == 1 assert response.json["pagination"]["pages"] == 1
def test_process_group_list_when_user_has_resticted_access(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_group_and_model_with_bpmn(
client, with_super_admin_user, process_group_id="admin_only", process_model_id="random_fact"
)
self.create_group_and_model_with_bpmn(
client, with_super_admin_user, process_group_id="all_users", process_model_id="hello_world"
)
user_one = self.create_user_with_permission(username="user_one", target_uri="/v1.0/process-groups/all_users:*")
self.add_permissions_to_user(user=user_one, target_uri="/v1.0/process-groups", permission_names=["read"])
response = client.get(
"/v1.0/process-groups",
headers=self.logged_in_headers(with_super_admin_user),
)
assert response.status_code == 200
assert response.json is not None
assert len(response.json["results"]) == 2
assert response.json["pagination"]["count"] == 2
assert response.json["pagination"]["total"] == 2
assert response.json["pagination"]["pages"] == 1
response = client.get(
"/v1.0/process-groups",
headers=self.logged_in_headers(user_one),
)
assert response.status_code == 200
assert response.json is not None
assert len(response.json["results"]) == 1
assert response.json["results"][0]["id"] == "all_users"
assert response.json["pagination"]["count"] == 1
assert response.json["pagination"]["total"] == 1
assert response.json["pagination"]["pages"] == 1
def test_process_model_list_when_user_has_resticted_access(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_group_and_model_with_bpmn(
client, with_super_admin_user, process_group_id="admin_only", process_model_id="random_fact"
)
self.create_group_and_model_with_bpmn(
client, with_super_admin_user, process_group_id="all_users", process_model_id="hello_world"
)
user_one = self.create_user_with_permission(username="user_one", target_uri="/v1.0/process-models/all_users:*")
self.add_permissions_to_user(user=user_one, target_uri="/v1.0/process-models", permission_names=["read"])
response = client.get(
"/v1.0/process-models?recursive=true",
headers=self.logged_in_headers(with_super_admin_user),
)
assert response.status_code == 200
assert response.json is not None
assert len(response.json["results"]) == 2
assert response.json["pagination"]["count"] == 2
assert response.json["pagination"]["total"] == 2
assert response.json["pagination"]["pages"] == 1
response = client.get(
"/v1.0/process-models?recursive=true",
headers=self.logged_in_headers(user_one),
)
assert response.status_code == 200
assert response.json is not None
assert len(response.json["results"]) == 1
assert response.json["results"][0]["id"] == "all_users/hello_world"
assert response.json["pagination"]["count"] == 1
assert response.json["pagination"]["total"] == 1
assert response.json["pagination"]["pages"] == 1
def test_get_process_group_when_found( def test_get_process_group_when_found(
self, self,
app: Flask, app: Flask,

View File

@ -287,9 +287,9 @@ class TestAuthorizationService(BaseTest):
client: FlaskClient, client: FlaskClient,
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
) -> None: ) -> None:
"""Test_explode_permissions_basic."""
expected_permissions = [ expected_permissions = [
("/active-users/*", "read"), ("/active-users/*", "read"),
("/process-groups", "read"),
("/process-instances/find-by-id/*", "read"), ("/process-instances/find-by-id/*", "read"),
("/process-instances/for-me", "create"), ("/process-instances/for-me", "create"),
("/process-instances/report-metadata", "read"), ("/process-instances/report-metadata", "read"),
@ -297,6 +297,7 @@ class TestAuthorizationService(BaseTest):
("/process-instances/reports/*", "delete"), ("/process-instances/reports/*", "delete"),
("/process-instances/reports/*", "read"), ("/process-instances/reports/*", "read"),
("/process-instances/reports/*", "update"), ("/process-instances/reports/*", "update"),
("/process-models", "read"),
("/processes", "read"), ("/processes", "read"),
("/processes/callers", "read"), ("/processes/callers", "read"),
("/service-tasks", "read"), ("/service-tasks", "read"),