This commit is contained in:
jasquat 2022-12-22 09:59:55 -05:00
parent 82d67bacc3
commit b2885159bd
3 changed files with 278 additions and 129 deletions

View File

@ -1,7 +1,7 @@
"""Authorization_service."""
from dataclasses import dataclass
import inspect
import re
from dataclasses import dataclass
from hashlib import sha256
from hmac import compare_digest
from hmac import HMAC
@ -48,21 +48,23 @@ class UserDoesNotHaveAccessToTaskError(Exception):
class InvalidPermissionError(Exception):
pass
"""InvalidPermissionError."""
@dataclass
class PermissionToAssign:
"""PermissionToAssign."""
permission: str
target_uri: str
PATH_SEGMENTS_FOR_PERMISSION_ALL = [
'/logs',
'/process-instances',
'/process-instance-suspend',
'/process-instance-terminate',
'/task-data',
"/logs",
"/process-instances",
"/process-instance-suspend",
"/process-instance-terminate",
"/task-data",
]
@ -535,33 +537,47 @@ class AuthorizationService:
return user_model # type: ignore
@classmethod
def get_permissions_to_assign(cls, permission_set: str, process_related_path_segment: str, target_uris: list[str]) -> list[PermissionToAssign]:
permissions = permission_set.split(',')
def get_permissions_to_assign(
cls,
permission_set: str,
process_related_path_segment: str,
target_uris: list[str],
) -> list[PermissionToAssign]:
"""Get_permissions_to_assign."""
permissions = permission_set.split(",")
if permission_set == "all":
permissions = ['create', 'read', 'update', 'delete']
permissions = ["create", "read", "update", "delete"]
permissions_to_assign: list[PermissionToAssign] = []
# we were thinking that if you can start an instance, you ought to be able to view your own instances.
if permission_set == "start":
target_uri = f"/process-instances/{process_related_path_segment}"
permissions_to_assign.append(PermissionToAssign(permission='create', target_uri=target_uri))
permissions_to_assign.append(
PermissionToAssign(permission="create", target_uri=target_uri)
)
target_uri = f"/process-instances/for-me/{process_related_path_segment}"
permissions_to_assign.append(PermissionToAssign(permission='read', target_uri=target_uri))
permissions_to_assign.append(
PermissionToAssign(permission="read", target_uri=target_uri)
)
else:
if permission_set == 'all':
if permission_set == "all":
for path_segment in PATH_SEGMENTS_FOR_PERMISSION_ALL:
target_uris.append(f"{path_segment}/{process_related_path_segment}")
for target_uri in target_uris:
for permission in permissions:
permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri=target_uri))
permissions_to_assign.append(
PermissionToAssign(permission=permission, target_uri=target_uri)
)
return permissions_to_assign
@classmethod
def explode_permissions(cls, permission_set: str, target: str) -> list[PermissionToAssign]:
def explode_permissions(
cls, permission_set: str, target: str
) -> list[PermissionToAssign]:
"""Explodes given permissions to and returns list of PermissionToAssign objects.
These can be used to then iterate through and inserted into the database.
@ -583,46 +599,87 @@ class AuthorizationService:
* only works with PG and PM target macros
"""
permissions_to_assign: list[PermissionToAssign] = []
permissions = permission_set.split(',')
permissions = permission_set.split(",")
if permission_set == "all":
permissions = ['create', 'read', 'update', 'delete']
permissions = ["create", "read", "update", "delete"]
if target.startswith("PG:"):
process_group_identifier = target.removeprefix("PG:").replace(":", "/").removeprefix('/')
process_group_identifier = (
target.removeprefix("PG:").replace(":", "/").removeprefix("/")
)
process_related_path_segment = f"{process_group_identifier}/*"
if process_group_identifier == "ALL":
process_related_path_segment = "*"
target_uris = [f"/process-groups/{process_related_path_segment}", f"/process-models/{process_related_path_segment}"]
permissions_to_assign = permissions_to_assign + cls.get_permissions_to_assign(permission_set, process_related_path_segment, target_uris)
target_uris = [
f"/process-groups/{process_related_path_segment}",
f"/process-models/{process_related_path_segment}",
]
permissions_to_assign = (
permissions_to_assign
+ cls.get_permissions_to_assign(
permission_set, process_related_path_segment, target_uris
)
)
elif target.startswith("PM:"):
process_model_identifier = target.removeprefix("PM:").replace(":", "/").removeprefix('/')
process_model_identifier = (
target.removeprefix("PM:").replace(":", "/").removeprefix("/")
)
process_related_path_segment = f"{process_model_identifier}/*"
if process_model_identifier == "ALL":
process_related_path_segment = "*"
target_uris = [f"/process-models/{process_related_path_segment}"]
permissions_to_assign = permissions_to_assign + cls.get_permissions_to_assign(permission_set, process_related_path_segment, target_uris)
permissions_to_assign = (
permissions_to_assign
+ cls.get_permissions_to_assign(
permission_set, process_related_path_segment, target_uris
)
)
elif permission_set == "start":
raise InvalidPermissionError("Permission 'start' is only available for macros PM and PG.")
raise InvalidPermissionError(
"Permission 'start' is only available for macros PM and PG."
)
elif target.startswith("BASIC"):
permissions_to_assign.append(PermissionToAssign(permission='read', target_uri="/process-instances/for-me"))
permissions_to_assign.append(PermissionToAssign(permission='read', target_uri="/processes"))
permissions_to_assign.append(PermissionToAssign(permission='read', target_uri="/service-tasks"))
permissions_to_assign.append(PermissionToAssign(permission='read', target_uri="/user-groups/for-current-user"))
permissions_to_assign.append(
PermissionToAssign(
permission="read", target_uri="/process-instances/for-me"
)
)
permissions_to_assign.append(
PermissionToAssign(permission="read", target_uri="/processes")
)
permissions_to_assign.append(
PermissionToAssign(permission="read", target_uri="/service-tasks")
)
permissions_to_assign.append(
PermissionToAssign(
permission="read", target_uri="/user-groups/for-current-user"
)
)
for permission in ['create', 'read', 'update', 'delete']:
permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri="/process-instances/reports/*"))
permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri="/tasks/*"))
for permission in ["create", "read", "update", "delete"]:
permissions_to_assign.append(
PermissionToAssign(
permission=permission, target_uri="/process-instances/reports/*"
)
)
permissions_to_assign.append(
PermissionToAssign(permission=permission, target_uri="/tasks/*")
)
elif target == "ALL":
for permission in permissions:
permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri='/*'))
elif target.startswith('/'):
permissions_to_assign.append(
PermissionToAssign(permission=permission, target_uri="/*")
)
elif target.startswith("/"):
for permission in permissions:
permissions_to_assign.append(PermissionToAssign(permission=permission, target_uri=target))
permissions_to_assign.append(
PermissionToAssign(permission=permission, target_uri=target)
)
else:
raise InvalidPermissionError(
f"Target uri '{target}' with permission set '{permission_set}' is invalid. "
@ -632,12 +689,16 @@ class AuthorizationService:
return permissions_to_assign
@classmethod
def add_permission_from_uri_or_macro(cls, group_identifier: str, permission: str, target: str) -> None:
def add_permission_from_uri_or_macro(
cls, group_identifier: str, permission: str, target: str
) -> None:
"""Add_permission_from_uri_or_macro."""
group = GroupService.find_or_create_group(group_identifier)
permissions_to_assign = cls.explode_permissions(permission, target)
for permission_to_assign in permissions_to_assign:
permission_target = AuthorizationService.find_or_create_permission_target(permission_to_assign.target_uri)
permission_target = AuthorizationService.find_or_create_permission_target(
permission_to_assign.target_uri
)
AuthorizationService.create_permission_for_principal(
group.principal, permission_target, permission_to_assign.permission
)

View File

@ -6,7 +6,8 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.models.user import UserNotFoundError
from spiffworkflow_backend.services.authorization_service import AuthorizationService, InvalidPermissionError
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.authorization_service import InvalidPermissionError
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
@ -151,38 +152,67 @@ class TestAuthorizationService(BaseTest):
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_explode_permissions_all_on_process_group."""
expected_permissions = [
('/logs/some-process-group/some-process-model/*', 'create'),
('/logs/some-process-group/some-process-model/*', 'delete'),
('/logs/some-process-group/some-process-model/*', 'read'),
('/logs/some-process-group/some-process-model/*', 'update'),
('/process-groups/some-process-group/some-process-model/*', 'create'),
('/process-groups/some-process-group/some-process-model/*', 'delete'),
('/process-groups/some-process-group/some-process-model/*', 'read'),
('/process-groups/some-process-group/some-process-model/*', 'update'),
('/process-instance-suspend/some-process-group/some-process-model/*', 'create'),
('/process-instance-suspend/some-process-group/some-process-model/*', 'delete'),
('/process-instance-suspend/some-process-group/some-process-model/*', 'read'),
('/process-instance-suspend/some-process-group/some-process-model/*', 'update'),
('/process-instance-terminate/some-process-group/some-process-model/*', 'create'),
('/process-instance-terminate/some-process-group/some-process-model/*', 'delete'),
('/process-instance-terminate/some-process-group/some-process-model/*', 'read'),
('/process-instance-terminate/some-process-group/some-process-model/*', 'update'),
('/process-instances/some-process-group/some-process-model/*', 'create'),
('/process-instances/some-process-group/some-process-model/*', 'delete'),
('/process-instances/some-process-group/some-process-model/*', 'read'),
('/process-instances/some-process-group/some-process-model/*', 'update'),
('/process-models/some-process-group/some-process-model/*', 'create'),
('/process-models/some-process-group/some-process-model/*', 'delete'),
('/process-models/some-process-group/some-process-model/*', 'read'),
('/process-models/some-process-group/some-process-model/*', 'update'),
('/task-data/some-process-group/some-process-model/*', 'create'),
('/task-data/some-process-group/some-process-model/*', 'delete'),
('/task-data/some-process-group/some-process-model/*', 'read'),
('/task-data/some-process-group/some-process-model/*', 'update'),
("/logs/some-process-group/some-process-model/*", "create"),
("/logs/some-process-group/some-process-model/*", "delete"),
("/logs/some-process-group/some-process-model/*", "read"),
("/logs/some-process-group/some-process-model/*", "update"),
("/process-groups/some-process-group/some-process-model/*", "create"),
("/process-groups/some-process-group/some-process-model/*", "delete"),
("/process-groups/some-process-group/some-process-model/*", "read"),
("/process-groups/some-process-group/some-process-model/*", "update"),
(
"/process-instance-suspend/some-process-group/some-process-model/*",
"create",
),
(
"/process-instance-suspend/some-process-group/some-process-model/*",
"delete",
),
(
"/process-instance-suspend/some-process-group/some-process-model/*",
"read",
),
(
"/process-instance-suspend/some-process-group/some-process-model/*",
"update",
),
(
"/process-instance-terminate/some-process-group/some-process-model/*",
"create",
),
(
"/process-instance-terminate/some-process-group/some-process-model/*",
"delete",
),
(
"/process-instance-terminate/some-process-group/some-process-model/*",
"read",
),
(
"/process-instance-terminate/some-process-group/some-process-model/*",
"update",
),
("/process-instances/some-process-group/some-process-model/*", "create"),
("/process-instances/some-process-group/some-process-model/*", "delete"),
("/process-instances/some-process-group/some-process-model/*", "read"),
("/process-instances/some-process-group/some-process-model/*", "update"),
("/process-models/some-process-group/some-process-model/*", "create"),
("/process-models/some-process-group/some-process-model/*", "delete"),
("/process-models/some-process-group/some-process-model/*", "read"),
("/process-models/some-process-group/some-process-model/*", "update"),
("/task-data/some-process-group/some-process-model/*", "create"),
("/task-data/some-process-group/some-process-model/*", "delete"),
("/task-data/some-process-group/some-process-model/*", "read"),
("/task-data/some-process-group/some-process-model/*", "update"),
]
permissions_to_assign = AuthorizationService.explode_permissions('all', 'PG:/some-process-group/some-process-model')
permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign])
permissions_to_assign = AuthorizationService.explode_permissions(
"all", "PG:/some-process-group/some-process-model"
)
permissions_to_assign_tuples = sorted(
[(p.target_uri, p.permission) for p in permissions_to_assign]
)
assert permissions_to_assign_tuples == expected_permissions
def test_explode_permissions_start_on_process_group(
@ -191,12 +221,20 @@ class TestAuthorizationService(BaseTest):
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_explode_permissions_start_on_process_group."""
expected_permissions = [
('/process-instances/for-me/some-process-group/some-process-model/*', 'read'),
('/process-instances/some-process-group/some-process-model/*', 'create'),
(
"/process-instances/for-me/some-process-group/some-process-model/*",
"read",
),
("/process-instances/some-process-group/some-process-model/*", "create"),
]
permissions_to_assign = AuthorizationService.explode_permissions('start', 'PG:/some-process-group/some-process-model')
permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign])
permissions_to_assign = AuthorizationService.explode_permissions(
"start", "PG:/some-process-group/some-process-model"
)
permissions_to_assign_tuples = sorted(
[(p.target_uri, p.permission) for p in permissions_to_assign]
)
assert permissions_to_assign_tuples == expected_permissions
def test_explode_permissions_all_on_process_model(
@ -205,34 +243,63 @@ class TestAuthorizationService(BaseTest):
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_explode_permissions_all_on_process_model."""
expected_permissions = [
('/logs/some-process-group/some-process-model/*', 'create'),
('/logs/some-process-group/some-process-model/*', 'delete'),
('/logs/some-process-group/some-process-model/*', 'read'),
('/logs/some-process-group/some-process-model/*', 'update'),
('/process-instance-suspend/some-process-group/some-process-model/*', 'create'),
('/process-instance-suspend/some-process-group/some-process-model/*', 'delete'),
('/process-instance-suspend/some-process-group/some-process-model/*', 'read'),
('/process-instance-suspend/some-process-group/some-process-model/*', 'update'),
('/process-instance-terminate/some-process-group/some-process-model/*', 'create'),
('/process-instance-terminate/some-process-group/some-process-model/*', 'delete'),
('/process-instance-terminate/some-process-group/some-process-model/*', 'read'),
('/process-instance-terminate/some-process-group/some-process-model/*', 'update'),
('/process-instances/some-process-group/some-process-model/*', 'create'),
('/process-instances/some-process-group/some-process-model/*', 'delete'),
('/process-instances/some-process-group/some-process-model/*', 'read'),
('/process-instances/some-process-group/some-process-model/*', 'update'),
('/process-models/some-process-group/some-process-model/*', 'create'),
('/process-models/some-process-group/some-process-model/*', 'delete'),
('/process-models/some-process-group/some-process-model/*', 'read'),
('/process-models/some-process-group/some-process-model/*', 'update'),
('/task-data/some-process-group/some-process-model/*', 'create'),
('/task-data/some-process-group/some-process-model/*', 'delete'),
('/task-data/some-process-group/some-process-model/*', 'read'),
('/task-data/some-process-group/some-process-model/*', 'update'),
("/logs/some-process-group/some-process-model/*", "create"),
("/logs/some-process-group/some-process-model/*", "delete"),
("/logs/some-process-group/some-process-model/*", "read"),
("/logs/some-process-group/some-process-model/*", "update"),
(
"/process-instance-suspend/some-process-group/some-process-model/*",
"create",
),
(
"/process-instance-suspend/some-process-group/some-process-model/*",
"delete",
),
(
"/process-instance-suspend/some-process-group/some-process-model/*",
"read",
),
(
"/process-instance-suspend/some-process-group/some-process-model/*",
"update",
),
(
"/process-instance-terminate/some-process-group/some-process-model/*",
"create",
),
(
"/process-instance-terminate/some-process-group/some-process-model/*",
"delete",
),
(
"/process-instance-terminate/some-process-group/some-process-model/*",
"read",
),
(
"/process-instance-terminate/some-process-group/some-process-model/*",
"update",
),
("/process-instances/some-process-group/some-process-model/*", "create"),
("/process-instances/some-process-group/some-process-model/*", "delete"),
("/process-instances/some-process-group/some-process-model/*", "read"),
("/process-instances/some-process-group/some-process-model/*", "update"),
("/process-models/some-process-group/some-process-model/*", "create"),
("/process-models/some-process-group/some-process-model/*", "delete"),
("/process-models/some-process-group/some-process-model/*", "read"),
("/process-models/some-process-group/some-process-model/*", "update"),
("/task-data/some-process-group/some-process-model/*", "create"),
("/task-data/some-process-group/some-process-model/*", "delete"),
("/task-data/some-process-group/some-process-model/*", "read"),
("/task-data/some-process-group/some-process-model/*", "update"),
]
permissions_to_assign = AuthorizationService.explode_permissions('all', 'PM:/some-process-group/some-process-model')
permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign])
permissions_to_assign = AuthorizationService.explode_permissions(
"all", "PM:/some-process-group/some-process-model"
)
permissions_to_assign_tuples = sorted(
[(p.target_uri, p.permission) for p in permissions_to_assign]
)
assert permissions_to_assign_tuples == expected_permissions
def test_explode_permissions_start_on_process_model(
@ -241,12 +308,20 @@ class TestAuthorizationService(BaseTest):
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_explode_permissions_start_on_process_model."""
expected_permissions = [
('/process-instances/for-me/some-process-group/some-process-model/*', 'read'),
('/process-instances/some-process-group/some-process-model/*', 'create'),
(
"/process-instances/for-me/some-process-group/some-process-model/*",
"read",
),
("/process-instances/some-process-group/some-process-model/*", "create"),
]
permissions_to_assign = AuthorizationService.explode_permissions('start', 'PM:/some-process-group/some-process-model')
permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign])
permissions_to_assign = AuthorizationService.explode_permissions(
"start", "PM:/some-process-group/some-process-model"
)
permissions_to_assign_tuples = sorted(
[(p.target_uri, p.permission) for p in permissions_to_assign]
)
assert permissions_to_assign_tuples == expected_permissions
def test_explode_permissions_basic(
@ -255,22 +330,25 @@ class TestAuthorizationService(BaseTest):
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_explode_permissions_basic."""
expected_permissions = [
('/process-instances/for-me', 'read'),
('/process-instances/reports/*', 'create'),
('/process-instances/reports/*', 'delete'),
('/process-instances/reports/*', 'read'),
('/process-instances/reports/*', 'update'),
('/processes', 'read'),
('/service-tasks', 'read'),
('/tasks/*', 'create'),
('/tasks/*', 'delete'),
('/tasks/*', 'read'),
('/tasks/*', 'update'),
('/user-groups/for-current-user', 'read'),
("/process-instances/for-me", "read"),
("/process-instances/reports/*", "create"),
("/process-instances/reports/*", "delete"),
("/process-instances/reports/*", "read"),
("/process-instances/reports/*", "update"),
("/processes", "read"),
("/service-tasks", "read"),
("/tasks/*", "create"),
("/tasks/*", "delete"),
("/tasks/*", "read"),
("/tasks/*", "update"),
("/user-groups/for-current-user", "read"),
]
permissions_to_assign = AuthorizationService.explode_permissions('all', 'BASIC')
permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign])
permissions_to_assign = AuthorizationService.explode_permissions("all", "BASIC")
permissions_to_assign_tuples = sorted(
[(p.target_uri, p.permission) for p in permissions_to_assign]
)
assert permissions_to_assign_tuples == expected_permissions
def test_explode_permissions_all(
@ -279,14 +357,17 @@ class TestAuthorizationService(BaseTest):
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_explode_permissions_all."""
expected_permissions = [
('/*', 'create'),
('/*', 'delete'),
('/*', 'read'),
('/*', 'update'),
("/*", "create"),
("/*", "delete"),
("/*", "read"),
("/*", "update"),
]
permissions_to_assign = AuthorizationService.explode_permissions('all', 'ALL')
permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign])
permissions_to_assign = AuthorizationService.explode_permissions("all", "ALL")
permissions_to_assign_tuples = sorted(
[(p.target_uri, p.permission) for p in permissions_to_assign]
)
assert permissions_to_assign_tuples == expected_permissions
def test_explode_permissions_with_target_uri(
@ -295,14 +376,19 @@ class TestAuthorizationService(BaseTest):
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_explode_permissions_with_target_uri."""
expected_permissions = [
('/hey/model', 'create'),
('/hey/model', 'delete'),
('/hey/model', 'read'),
('/hey/model', 'update'),
("/hey/model", "create"),
("/hey/model", "delete"),
("/hey/model", "read"),
("/hey/model", "update"),
]
permissions_to_assign = AuthorizationService.explode_permissions('all', '/hey/model')
permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign])
permissions_to_assign = AuthorizationService.explode_permissions(
"all", "/hey/model"
)
permissions_to_assign_tuples = sorted(
[(p.target_uri, p.permission) for p in permissions_to_assign]
)
assert permissions_to_assign_tuples == expected_permissions
def test_explode_permissions_with_invalid_target_uri(
@ -311,8 +397,9 @@ class TestAuthorizationService(BaseTest):
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_explode_permissions_with_invalid_target_uri."""
with pytest.raises(InvalidPermissionError):
AuthorizationService.explode_permissions('all', 'BAD_MACRO')
AuthorizationService.explode_permissions("all", "BAD_MACRO")
def test_explode_permissions_with_start_to_incorrect_target(
self,
@ -320,5 +407,6 @@ class TestAuthorizationService(BaseTest):
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_explode_permissions_with_start_to_incorrect_target."""
with pytest.raises(InvalidPermissionError):
AuthorizationService.explode_permissions('start', '/hey/model')
AuthorizationService.explode_permissions("start", "/hey/model")