added ALL macro for easier use with admin groups and some failure test cases

This commit is contained in:
jasquat 2022-12-22 09:57:13 -05:00
parent 2519c9f952
commit 1f6f20a734
2 changed files with 87 additions and 12 deletions

View File

@ -47,6 +47,10 @@ class UserDoesNotHaveAccessToTaskError(Exception):
"""UserDoesNotHaveAccessToTaskError.""" """UserDoesNotHaveAccessToTaskError."""
class InvalidPermissionError(Exception):
pass
@dataclass @dataclass
class PermissionToAssign: class PermissionToAssign:
permission: str permission: str
@ -531,22 +535,22 @@ class AuthorizationService:
return user_model # type: ignore return user_model # type: ignore
@classmethod @classmethod
def get_permissions_to_assign(cls, permission: str, process_related_path_segment: str, target_uris: list[str]) -> list[PermissionToAssign]: def get_permissions_to_assign(cls, permission_set: str, process_related_path_segment: str, target_uris: list[str]) -> list[PermissionToAssign]:
permissions = [permission] permissions = permission_set.split(',')
if permission == "all": if permission_set == "all":
permissions = ['create', 'read', 'update', 'delete'] permissions = ['create', 'read', 'update', 'delete']
permissions_to_assign: list[PermissionToAssign] = [] permissions_to_assign: list[PermissionToAssign] = []
# we were thinking that if you can start an instance, you ought to be able to view your own instances. # we were thinking that if you can start an instance, you ought to be able to view your own instances.
if permission == "start": if permission_set == "start":
target_uri = f"/process-instances/{process_related_path_segment}" target_uri = f"/process-instances/{process_related_path_segment}"
permissions_to_assign.append(PermissionToAssign(permission='create', target_uri=target_uri)) permissions_to_assign.append(PermissionToAssign(permission='create', target_uri=target_uri))
target_uri = f"/process-instances/for-me/{process_related_path_segment}" target_uri = f"/process-instances/for-me/{process_related_path_segment}"
permissions_to_assign.append(PermissionToAssign(permission='read', target_uri=target_uri)) permissions_to_assign.append(PermissionToAssign(permission='read', target_uri=target_uri))
else: else:
if permission == 'all': if permission_set == 'all':
for path_segment in PATH_SEGMENTS_FOR_PERMISSION_ALL: for path_segment in PATH_SEGMENTS_FOR_PERMISSION_ALL:
target_uris.append(f"{path_segment}/{process_related_path_segment}") target_uris.append(f"{path_segment}/{process_related_path_segment}")
@ -557,11 +561,13 @@ class AuthorizationService:
return permissions_to_assign return permissions_to_assign
@classmethod @classmethod
def explode_permissions(cls, permission: str, target: str) -> list[PermissionToAssign]: def explode_permissions(cls, permission_set: str, target: str) -> list[PermissionToAssign]:
"""Explodes given permissions to and returns list of PermissionToAssign objects. """Explodes given permissions to and returns list of PermissionToAssign objects.
These can be used to then iterate through and inserted into the database. These can be used to then iterate through and inserted into the database.
Target Macros: Target Macros:
ALL
* gives access to ALL api endpoints - useful to give admin-like permissions
PG:[process_group_identifier] PG:[process_group_identifier]
* affects given process-group and all sub process-groups and process-models * affects given process-group and all sub process-groups and process-models
PM:[process_model_identifier] PM:[process_model_identifier]
@ -570,17 +576,24 @@ class AuthorizationService:
* Basic access to complete tasks and use the site * Basic access to complete tasks and use the site
Permission Macros: Permission Macros:
all - create, read, update, delete all
start - create process-instances (aka instantiate or start a process-model) * create, read, update, delete
start
* create process-instances (aka instantiate or start a process-model)
* only works with PG and PM target macros
""" """
permissions_to_assign: list[PermissionToAssign] = [] permissions_to_assign: list[PermissionToAssign] = []
permissions = permission_set.split(',')
if permission_set == "all":
permissions = ['create', 'read', 'update', 'delete']
if target.startswith("PG:"): if target.startswith("PG:"):
process_group_identifier = target.removeprefix("PG:").replace(":", "/").removeprefix('/') process_group_identifier = target.removeprefix("PG:").replace(":", "/").removeprefix('/')
process_related_path_segment = f"{process_group_identifier}/*" process_related_path_segment = f"{process_group_identifier}/*"
if process_group_identifier == "ALL": if process_group_identifier == "ALL":
process_related_path_segment = "*" process_related_path_segment = "*"
target_uris = [f"/process-groups/{process_related_path_segment}", f"/process-models/{process_related_path_segment}"] target_uris = [f"/process-groups/{process_related_path_segment}", f"/process-models/{process_related_path_segment}"]
permissions_to_assign = permissions_to_assign + cls.get_permissions_to_assign(permission, process_related_path_segment, target_uris) permissions_to_assign = permissions_to_assign + cls.get_permissions_to_assign(permission_set, process_related_path_segment, target_uris)
elif target.startswith("PM:"): elif target.startswith("PM:"):
process_model_identifier = target.removeprefix("PM:").replace(":", "/").removeprefix('/') process_model_identifier = target.removeprefix("PM:").replace(":", "/").removeprefix('/')
@ -590,7 +603,10 @@ class AuthorizationService:
process_related_path_segment = "*" process_related_path_segment = "*"
target_uris = [f"/process-models/{process_related_path_segment}"] target_uris = [f"/process-models/{process_related_path_segment}"]
permissions_to_assign = permissions_to_assign + cls.get_permissions_to_assign(permission, process_related_path_segment, target_uris) permissions_to_assign = permissions_to_assign + cls.get_permissions_to_assign(permission_set, process_related_path_segment, target_uris)
elif permission_set == "start":
raise InvalidPermissionError("Permission 'start' is only available for macros PM and PG.")
elif target.startswith("BASIC"): elif target.startswith("BASIC"):
permissions_to_assign.append(PermissionToAssign(permission='read', target_uri="/process-instances/for-me")) permissions_to_assign.append(PermissionToAssign(permission='read', target_uri="/process-instances/for-me"))
@ -601,8 +617,17 @@ class AuthorizationService:
for permission in ['create', 'read', 'update', 'delete']: for permission in ['create', 'read', 'update', 'delete']:
permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri="/process-instances/reports/*")) permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri="/process-instances/reports/*"))
permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri="/tasks/*")) permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri="/tasks/*"))
elif target == "ALL":
for permission in permissions:
permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri='/*'))
elif target.startswith('/'):
for permission in permissions:
permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri=target))
else: else:
permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri=target)) raise InvalidPermissionError(
f"Target uri '{target}' with permission set '{permission_set}' is invalid. "
f"The target uri must either be a macro of PG, PM, BASIC, or ALL or an api uri."
)
return permissions_to_assign return permissions_to_assign

View File

@ -6,7 +6,7 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.models.user import UserNotFoundError from spiffworkflow_backend.models.user import UserNotFoundError
from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.authorization_service import AuthorizationService, InvalidPermissionError
from spiffworkflow_backend.services.process_instance_processor import ( from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor, ProcessInstanceProcessor,
) )
@ -272,3 +272,53 @@ class TestAuthorizationService(BaseTest):
permissions_to_assign = AuthorizationService.explode_permissions('all', 'BASIC') permissions_to_assign = AuthorizationService.explode_permissions('all', 'BASIC')
permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign]) permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign])
assert permissions_to_assign_tuples == expected_permissions assert permissions_to_assign_tuples == expected_permissions
def test_explode_permissions_all(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
expected_permissions = [
('/*', 'create'),
('/*', 'delete'),
('/*', 'read'),
('/*', 'update'),
]
permissions_to_assign = AuthorizationService.explode_permissions('all', 'ALL')
permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign])
assert permissions_to_assign_tuples == expected_permissions
def test_explode_permissions_with_target_uri(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
expected_permissions = [
('/hey/model', 'create'),
('/hey/model', 'delete'),
('/hey/model', 'read'),
('/hey/model', 'update'),
]
permissions_to_assign = AuthorizationService.explode_permissions('all', '/hey/model')
permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign])
assert permissions_to_assign_tuples == expected_permissions
def test_explode_permissions_with_invalid_target_uri(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
with pytest.raises(InvalidPermissionError):
AuthorizationService.explode_permissions('all', 'BAD_MACRO')
def test_explode_permissions_with_start_to_incorrect_target(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
with pytest.raises(InvalidPermissionError):
AuthorizationService.explode_permissions('start', '/hey/model')