added ALL macro for easier use with admin groups and some failure test cases

This commit is contained in:
jasquat 2022-12-22 09:57:13 -05:00
parent c00252815d
commit 82d67bacc3
2 changed files with 87 additions and 12 deletions

View File

@ -47,6 +47,10 @@ class UserDoesNotHaveAccessToTaskError(Exception):
"""UserDoesNotHaveAccessToTaskError."""
class InvalidPermissionError(Exception):
pass
@dataclass
class PermissionToAssign:
permission: str
@ -531,22 +535,22 @@ class AuthorizationService:
return user_model # type: ignore
@classmethod
def get_permissions_to_assign(cls, permission: str, process_related_path_segment: str, target_uris: list[str]) -> list[PermissionToAssign]:
permissions = [permission]
if permission == "all":
def get_permissions_to_assign(cls, permission_set: str, process_related_path_segment: str, target_uris: list[str]) -> list[PermissionToAssign]:
permissions = permission_set.split(',')
if permission_set == "all":
permissions = ['create', 'read', 'update', 'delete']
permissions_to_assign: list[PermissionToAssign] = []
# we were thinking that if you can start an instance, you ought to be able to view your own instances.
if permission == "start":
if permission_set == "start":
target_uri = f"/process-instances/{process_related_path_segment}"
permissions_to_assign.append(PermissionToAssign(permission='create', target_uri=target_uri))
target_uri = f"/process-instances/for-me/{process_related_path_segment}"
permissions_to_assign.append(PermissionToAssign(permission='read', target_uri=target_uri))
else:
if permission == 'all':
if permission_set == 'all':
for path_segment in PATH_SEGMENTS_FOR_PERMISSION_ALL:
target_uris.append(f"{path_segment}/{process_related_path_segment}")
@ -557,11 +561,13 @@ class AuthorizationService:
return permissions_to_assign
@classmethod
def explode_permissions(cls, permission: str, target: str) -> list[PermissionToAssign]:
def explode_permissions(cls, permission_set: str, target: str) -> list[PermissionToAssign]:
"""Explodes given permissions to and returns list of PermissionToAssign objects.
These can be used to then iterate through and inserted into the database.
Target Macros:
ALL
* gives access to ALL api endpoints - useful to give admin-like permissions
PG:[process_group_identifier]
* affects given process-group and all sub process-groups and process-models
PM:[process_model_identifier]
@ -570,17 +576,24 @@ class AuthorizationService:
* Basic access to complete tasks and use the site
Permission Macros:
all - create, read, update, delete
start - create process-instances (aka instantiate or start a process-model)
all
* create, read, update, delete
start
* create process-instances (aka instantiate or start a process-model)
* only works with PG and PM target macros
"""
permissions_to_assign: list[PermissionToAssign] = []
permissions = permission_set.split(',')
if permission_set == "all":
permissions = ['create', 'read', 'update', 'delete']
if target.startswith("PG:"):
process_group_identifier = target.removeprefix("PG:").replace(":", "/").removeprefix('/')
process_related_path_segment = f"{process_group_identifier}/*"
if process_group_identifier == "ALL":
process_related_path_segment = "*"
target_uris = [f"/process-groups/{process_related_path_segment}", f"/process-models/{process_related_path_segment}"]
permissions_to_assign = permissions_to_assign + cls.get_permissions_to_assign(permission, process_related_path_segment, target_uris)
permissions_to_assign = permissions_to_assign + cls.get_permissions_to_assign(permission_set, process_related_path_segment, target_uris)
elif target.startswith("PM:"):
process_model_identifier = target.removeprefix("PM:").replace(":", "/").removeprefix('/')
@ -590,7 +603,10 @@ class AuthorizationService:
process_related_path_segment = "*"
target_uris = [f"/process-models/{process_related_path_segment}"]
permissions_to_assign = permissions_to_assign + cls.get_permissions_to_assign(permission, process_related_path_segment, target_uris)
permissions_to_assign = permissions_to_assign + cls.get_permissions_to_assign(permission_set, process_related_path_segment, target_uris)
elif permission_set == "start":
raise InvalidPermissionError("Permission 'start' is only available for macros PM and PG.")
elif target.startswith("BASIC"):
permissions_to_assign.append(PermissionToAssign(permission='read', target_uri="/process-instances/for-me"))
@ -601,8 +617,17 @@ class AuthorizationService:
for permission in ['create', 'read', 'update', 'delete']:
permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri="/process-instances/reports/*"))
permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri="/tasks/*"))
elif target == "ALL":
for permission in permissions:
permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri='/*'))
elif target.startswith('/'):
for permission in permissions:
permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri=target))
else:
permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri=target))
raise InvalidPermissionError(
f"Target uri '{target}' with permission set '{permission_set}' is invalid. "
f"The target uri must either be a macro of PG, PM, BASIC, or ALL or an api uri."
)
return permissions_to_assign

View File

@ -6,7 +6,7 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.models.user import UserNotFoundError
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.authorization_service import AuthorizationService, InvalidPermissionError
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
@ -272,3 +272,53 @@ class TestAuthorizationService(BaseTest):
permissions_to_assign = AuthorizationService.explode_permissions('all', 'BASIC')
permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign])
assert permissions_to_assign_tuples == expected_permissions
def test_explode_permissions_all(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
expected_permissions = [
('/*', 'create'),
('/*', 'delete'),
('/*', 'read'),
('/*', 'update'),
]
permissions_to_assign = AuthorizationService.explode_permissions('all', 'ALL')
permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign])
assert permissions_to_assign_tuples == expected_permissions
def test_explode_permissions_with_target_uri(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
expected_permissions = [
('/hey/model', 'create'),
('/hey/model', 'delete'),
('/hey/model', 'read'),
('/hey/model', 'update'),
]
permissions_to_assign = AuthorizationService.explode_permissions('all', '/hey/model')
permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign])
assert permissions_to_assign_tuples == expected_permissions
def test_explode_permissions_with_invalid_target_uri(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
with pytest.raises(InvalidPermissionError):
AuthorizationService.explode_permissions('all', 'BAD_MACRO')
def test_explode_permissions_with_start_to_incorrect_target(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
with pytest.raises(InvalidPermissionError):
AuthorizationService.explode_permissions('start', '/hey/model')