refactored import perms from yaml and from dmn tables to do the same thing w/ burnettk
This commit is contained in:
parent
5b793d5a81
commit
84f3847c50
|
@ -143,6 +143,7 @@ SPIFFWORKFLOW_BACKEND_ALLOW_CONFISCATING_LOCK_AFTER_SECONDS = int(
|
||||||
environ.get("SPIFFWORKFLOW_BACKEND_ALLOW_CONFISCATING_LOCK_AFTER_SECONDS", default="600")
|
environ.get("SPIFFWORKFLOW_BACKEND_ALLOW_CONFISCATING_LOCK_AFTER_SECONDS", default="600")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# FIXME: do not default this but we will need to coordinate release of it since it is a breaking change
|
||||||
SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP = environ.get("SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP", default="everybody")
|
SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP = environ.get("SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP", default="everybody")
|
||||||
|
|
||||||
SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_BACKGROUND = environ.get(
|
SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_BACKGROUND = environ.get(
|
||||||
|
|
|
@ -1,5 +1,3 @@
|
||||||
default_group: everybody
|
|
||||||
|
|
||||||
users:
|
users:
|
||||||
testadmin1:
|
testadmin1:
|
||||||
service: https://testing/openid/thing
|
service: https://testing/openid/thing
|
||||||
|
@ -20,49 +18,41 @@ groups:
|
||||||
permissions:
|
permissions:
|
||||||
admin:
|
admin:
|
||||||
groups: [admin]
|
groups: [admin]
|
||||||
users: []
|
|
||||||
allowed_permissions: [create, read, update, delete]
|
allowed_permissions: [create, read, update, delete]
|
||||||
uri: /*
|
uri: /*
|
||||||
|
|
||||||
read-all:
|
read-all:
|
||||||
groups: ["Finance Team", hr, admin]
|
groups: ["Finance Team", hr, admin]
|
||||||
users: []
|
|
||||||
allowed_permissions: [read]
|
allowed_permissions: [read]
|
||||||
uri: /*
|
uri: /*
|
||||||
|
|
||||||
process-instances-find-by-id:
|
process-instances-find-by-id:
|
||||||
groups: [everybody]
|
groups: [everybody]
|
||||||
users: []
|
|
||||||
allowed_permissions: [read]
|
allowed_permissions: [read]
|
||||||
uri: /process-instances/find-by-id/*
|
uri: /process-instances/find-by-id/*
|
||||||
|
|
||||||
tasks-crud:
|
tasks-crud:
|
||||||
groups: [everybody]
|
groups: [everybody]
|
||||||
users: []
|
|
||||||
allowed_permissions: [create, read, update, delete]
|
allowed_permissions: [create, read, update, delete]
|
||||||
uri: /tasks/*
|
uri: /tasks/*
|
||||||
|
|
||||||
# TODO: all uris should really have the same structure
|
# TODO: all uris should really have the same structure
|
||||||
finance-admin-group:
|
finance-admin-group:
|
||||||
groups: ["Finance Team"]
|
groups: ["Finance Team"]
|
||||||
users: [testuser4]
|
|
||||||
allowed_permissions: [create, read, update, delete]
|
allowed_permissions: [create, read, update, delete]
|
||||||
uri: /process-groups/finance/*
|
uri: /process-groups/finance/*
|
||||||
|
|
||||||
finance-admin-model:
|
finance-admin-model:
|
||||||
groups: ["Finance Team"]
|
groups: ["Finance Team"]
|
||||||
users: [testuser4]
|
|
||||||
allowed_permissions: [create, read, update, delete]
|
allowed_permissions: [create, read, update, delete]
|
||||||
uri: /process-models/finance/*
|
uri: /process-models/finance/*
|
||||||
|
|
||||||
finance-admin-model-lanes:
|
finance-admin-model-lanes:
|
||||||
groups: ["Finance Team"]
|
groups: ["Finance Team"]
|
||||||
users: [testuser4]
|
|
||||||
allowed_permissions: [create, read, update, delete]
|
allowed_permissions: [create, read, update, delete]
|
||||||
uri: /process-models/finance:model_with_lanes/*
|
uri: /process-models/finance:model_with_lanes/*
|
||||||
|
|
||||||
finance-admin-instance-run:
|
finance-admin-instance-run:
|
||||||
groups: ["Finance Team"]
|
groups: ["Finance Team"]
|
||||||
users: [testuser4]
|
|
||||||
allowed_permissions: [create, read, update, delete]
|
allowed_permissions: [create, read, update, delete]
|
||||||
uri: /process-instances/*
|
uri: /process-instances/*
|
||||||
|
|
|
@ -35,4 +35,5 @@ class RefreshPermissions(Script):
|
||||||
**kwargs: Any,
|
**kwargs: Any,
|
||||||
) -> Any:
|
) -> Any:
|
||||||
group_info = args[0]
|
group_info = args[0]
|
||||||
|
import pdb; pdb.set_trace()
|
||||||
AuthorizationService.refresh_permissions(group_info)
|
AuthorizationService.refresh_permissions(group_info)
|
||||||
|
|
|
@ -21,6 +21,7 @@ from sqlalchemy import or_
|
||||||
from sqlalchemy import text
|
from sqlalchemy import text
|
||||||
|
|
||||||
from spiffworkflow_backend.helpers.api_version import V1_API_PATH_PREFIX
|
from spiffworkflow_backend.helpers.api_version import V1_API_PATH_PREFIX
|
||||||
|
from spiffworkflow_backend.models import permission_assignment
|
||||||
from spiffworkflow_backend.models.db import db
|
from spiffworkflow_backend.models.db import db
|
||||||
from spiffworkflow_backend.models.group import GroupModel
|
from spiffworkflow_backend.models.group import GroupModel
|
||||||
from spiffworkflow_backend.models.human_task import HumanTaskModel
|
from spiffworkflow_backend.models.human_task import HumanTaskModel
|
||||||
|
@ -196,88 +197,13 @@ class AuthorizationService:
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def import_permissions_from_yaml_file(cls, raise_if_missing_user: bool = False) -> DesiredPermissionDict:
|
def import_permissions_from_yaml_file(cls, user_model: Optional[UserModel] = None) -> DesiredPermissionDict:
|
||||||
"""Import_permissions_from_yaml_file."""
|
group_permissions = cls.parse_permissions_yaml_into_group_info()
|
||||||
if current_app.config["SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME"] is None:
|
result = cls.add_permissions_from_group_permissions(group_permissions, user_model)
|
||||||
raise (
|
return result
|
||||||
PermissionsFileNotSetError(
|
|
||||||
"SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME needs to be set in order to import permissions"
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
permission_configs = None
|
|
||||||
with open(current_app.config["SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_ABSOLUTE_PATH"]) as file:
|
|
||||||
permission_configs = yaml.safe_load(file)
|
|
||||||
|
|
||||||
default_group = None
|
|
||||||
unique_user_group_identifiers: Set[str] = set()
|
|
||||||
user_to_group_identifiers: list[UserToGroupDict] = []
|
|
||||||
if "default_group" in permission_configs:
|
|
||||||
default_group_identifier = permission_configs["default_group"]
|
|
||||||
default_group = GroupService.find_or_create_group(default_group_identifier)
|
|
||||||
unique_user_group_identifiers.add(default_group_identifier)
|
|
||||||
|
|
||||||
if "groups" in permission_configs:
|
|
||||||
for group_identifier, group_config in permission_configs["groups"].items():
|
|
||||||
group = GroupService.find_or_create_group(group_identifier)
|
|
||||||
unique_user_group_identifiers.add(group_identifier)
|
|
||||||
for username in group_config["users"]:
|
|
||||||
user = UserModel.query.filter_by(username=username).first()
|
|
||||||
if user is None:
|
|
||||||
if raise_if_missing_user:
|
|
||||||
raise (UserNotFoundError(f"Could not find a user with name: {username}"))
|
|
||||||
continue
|
|
||||||
user_to_group_dict: UserToGroupDict = {
|
|
||||||
"username": user.username,
|
|
||||||
"group_identifier": group_identifier,
|
|
||||||
}
|
|
||||||
user_to_group_identifiers.append(user_to_group_dict)
|
|
||||||
cls.associate_user_with_group(user, group)
|
|
||||||
|
|
||||||
permission_assignments = []
|
|
||||||
if "permissions" in permission_configs:
|
|
||||||
for _permission_identifier, permission_config in permission_configs["permissions"].items():
|
|
||||||
uri = permission_config["uri"]
|
|
||||||
permission_target = cls.find_or_create_permission_target(uri)
|
|
||||||
|
|
||||||
for allowed_permission in permission_config["allowed_permissions"]:
|
|
||||||
if "groups" in permission_config:
|
|
||||||
for group_identifier in permission_config["groups"]:
|
|
||||||
group = GroupService.find_or_create_group(group_identifier)
|
|
||||||
unique_user_group_identifiers.add(group_identifier)
|
|
||||||
permission_assignments.append(
|
|
||||||
cls.create_permission_for_principal(
|
|
||||||
group.principal,
|
|
||||||
permission_target,
|
|
||||||
allowed_permission,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
if "users" in permission_config:
|
|
||||||
for username in permission_config["users"]:
|
|
||||||
user = UserModel.query.filter_by(username=username).first()
|
|
||||||
if user is not None:
|
|
||||||
principal = (
|
|
||||||
PrincipalModel.query.join(UserModel).filter(UserModel.username == username).first()
|
|
||||||
)
|
|
||||||
permission_assignments.append(
|
|
||||||
cls.create_permission_for_principal(
|
|
||||||
principal, permission_target, allowed_permission
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
if default_group is not None:
|
|
||||||
for user in UserModel.query.all():
|
|
||||||
cls.associate_user_with_group(user, default_group)
|
|
||||||
|
|
||||||
return {
|
|
||||||
"group_identifiers": unique_user_group_identifiers,
|
|
||||||
"permission_assignments": permission_assignments,
|
|
||||||
"user_to_group_identifiers": user_to_group_identifiers,
|
|
||||||
}
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def find_or_create_permission_target(cls, uri: str) -> PermissionTargetModel:
|
def find_or_create_permission_target(cls, uri: str) -> PermissionTargetModel:
|
||||||
"""Find_or_create_permission_target."""
|
|
||||||
uri_with_percent = re.sub(r"\*", "%", uri)
|
uri_with_percent = re.sub(r"\*", "%", uri)
|
||||||
target_uri_normalized = uri_with_percent.removeprefix(V1_API_PATH_PREFIX)
|
target_uri_normalized = uri_with_percent.removeprefix(V1_API_PATH_PREFIX)
|
||||||
permission_target: Optional[PermissionTargetModel] = PermissionTargetModel.query.filter_by(
|
permission_target: Optional[PermissionTargetModel] = PermissionTargetModel.query.filter_by(
|
||||||
|
@ -296,7 +222,6 @@ class AuthorizationService:
|
||||||
permission_target: PermissionTargetModel,
|
permission_target: PermissionTargetModel,
|
||||||
permission: str,
|
permission: str,
|
||||||
) -> PermissionAssignmentModel:
|
) -> PermissionAssignmentModel:
|
||||||
"""Create_permission_for_principal."""
|
|
||||||
permission_assignment: Optional[PermissionAssignmentModel] = PermissionAssignmentModel.query.filter_by(
|
permission_assignment: Optional[PermissionAssignmentModel] = PermissionAssignmentModel.query.filter_by(
|
||||||
principal_id=principal.id,
|
principal_id=principal.id,
|
||||||
permission_target_id=permission_target.id,
|
permission_target_id=permission_target.id,
|
||||||
|
@ -315,7 +240,6 @@ class AuthorizationService:
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def should_disable_auth_for_request(cls) -> bool:
|
def should_disable_auth_for_request(cls) -> bool:
|
||||||
"""Should_disable_auth_for_request."""
|
|
||||||
swagger_functions = ["get_json_spec"]
|
swagger_functions = ["get_json_spec"]
|
||||||
authentication_exclusion_list = [
|
authentication_exclusion_list = [
|
||||||
"status",
|
"status",
|
||||||
|
@ -353,7 +277,6 @@ class AuthorizationService:
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_permission_from_http_method(cls, http_method: str) -> Optional[str]:
|
def get_permission_from_http_method(cls, http_method: str) -> Optional[str]:
|
||||||
"""Get_permission_from_request_method."""
|
|
||||||
request_method_mapper = {
|
request_method_mapper = {
|
||||||
"POST": "create",
|
"POST": "create",
|
||||||
"GET": "read",
|
"GET": "read",
|
||||||
|
@ -515,7 +438,7 @@ class AuthorizationService:
|
||||||
# we are also a little apprehensive about pre-creating users
|
# we are also a little apprehensive about pre-creating users
|
||||||
# before the user signs in, because we won't know things like
|
# before the user signs in, because we won't know things like
|
||||||
# the external service user identifier.
|
# the external service user identifier.
|
||||||
cls.import_permissions_from_yaml_file()
|
cls.import_permissions_from_yaml_file(user_model)
|
||||||
|
|
||||||
if is_new_user:
|
if is_new_user:
|
||||||
UserService.add_user_to_human_tasks_if_appropriate(user_model)
|
UserService.add_user_to_human_tasks_if_appropriate(user_model)
|
||||||
|
@ -720,11 +643,9 @@ class AuthorizationService:
|
||||||
with open(current_app.config["SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_ABSOLUTE_PATH"]) as file:
|
with open(current_app.config["SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_ABSOLUTE_PATH"]) as file:
|
||||||
permission_configs = yaml.safe_load(file)
|
permission_configs = yaml.safe_load(file)
|
||||||
|
|
||||||
group_permissions: list[GroupPermissionsDict] = []
|
|
||||||
group_permissions_by_group: dict[str, GroupPermissionsDict] = {}
|
group_permissions_by_group: dict[str, GroupPermissionsDict] = {}
|
||||||
if "default_group" in permission_configs:
|
if current_app.config['SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP']:
|
||||||
default_group_identifier = permission_configs["default_group"]
|
default_group_identifier = current_app.config['SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP']
|
||||||
# group_permissions.append({"name": default_group_identifier, "users": [], "permissions": []})
|
|
||||||
group_permissions_by_group[default_group_identifier] = {"name": default_group_identifier, "users": [], "permissions": []}
|
group_permissions_by_group[default_group_identifier] = {"name": default_group_identifier, "users": [], "permissions": []}
|
||||||
|
|
||||||
if "groups" in permission_configs:
|
if "groups" in permission_configs:
|
||||||
|
@ -738,25 +659,25 @@ class AuthorizationService:
|
||||||
for _permission_identifier, permission_config in permission_configs["permissions"].items():
|
for _permission_identifier, permission_config in permission_configs["permissions"].items():
|
||||||
uri = permission_config["uri"]
|
uri = permission_config["uri"]
|
||||||
for group_identifier in permission_config["groups"]:
|
for group_identifier in permission_config["groups"]:
|
||||||
group_permissions_by_group[group_identifier]['permissions'].append({'actions': permission_config["allowed_permissions"], "uri": uri})
|
group_permissions_by_group[group_identifier]['permissions'].append(
|
||||||
|
{'actions': permission_config["allowed_permissions"], "uri": uri}
|
||||||
|
)
|
||||||
|
|
||||||
for _group_identifier, group_permission in group_permissions_by_group.items():
|
return list(group_permissions_by_group.values())
|
||||||
group_permissions.append(group_permission)
|
|
||||||
|
|
||||||
return group_permissions
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def refresh_permissions(cls, group_info: list[GroupPermissionsDict]) -> None:
|
def add_permissions_from_group_permissions(cls, group_permissions: list[GroupPermissionsDict], user_model: Optional[UserModel] = None) -> DesiredPermissionDict:
|
||||||
"""Adds new permission assignments and deletes old ones."""
|
unique_user_group_identifiers: Set[str] = set()
|
||||||
initial_permission_assignments = PermissionAssignmentModel.query.all()
|
user_to_group_identifiers: list[UserToGroupDict] = []
|
||||||
initial_user_to_group_assignments = UserGroupAssignmentModel.query.all()
|
permission_assignments = []
|
||||||
group_info = group_info + cls.parse_permissions_yaml_into_group_info()
|
|
||||||
|
|
||||||
desired_group_identifiers: Set[str] = set()
|
default_group = None
|
||||||
desired_permission_assignments: list[PermissionAssignmentModel] = []
|
default_group_identifier = current_app.config['SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP']
|
||||||
desired_user_to_group_identifiers: list[UserToGroupDict] = []
|
if default_group_identifier:
|
||||||
|
default_group = GroupService.find_or_create_group(default_group_identifier)
|
||||||
|
unique_user_group_identifiers.add(default_group_identifier)
|
||||||
|
|
||||||
for group in group_info:
|
for group in group_permissions:
|
||||||
group_identifier = group["name"]
|
group_identifier = group["name"]
|
||||||
GroupService.find_or_create_group(group_identifier)
|
GroupService.find_or_create_group(group_identifier)
|
||||||
for username in group["users"]:
|
for username in group["users"]:
|
||||||
|
@ -764,19 +685,44 @@ class AuthorizationService:
|
||||||
"username": username,
|
"username": username,
|
||||||
"group_identifier": group_identifier,
|
"group_identifier": group_identifier,
|
||||||
}
|
}
|
||||||
desired_user_to_group_identifiers.append(user_to_group_dict)
|
user_to_group_identifiers.append(user_to_group_dict)
|
||||||
GroupService.add_user_to_group_or_add_to_waiting(username, group_identifier)
|
GroupService.add_user_to_group_or_add_to_waiting(username, group_identifier)
|
||||||
desired_group_identifiers.add(group_identifier)
|
unique_user_group_identifiers.add(group_identifier)
|
||||||
for permission in group["permissions"]:
|
for permission in group["permissions"]:
|
||||||
for crud_op in permission["actions"]:
|
for crud_op in permission["actions"]:
|
||||||
desired_permission_assignments.extend(
|
permission_assignments.extend(
|
||||||
cls.add_permission_from_uri_or_macro(
|
cls.add_permission_from_uri_or_macro(
|
||||||
group_identifier=group_identifier,
|
group_identifier=group_identifier,
|
||||||
target=permission["uri"],
|
target=permission["uri"],
|
||||||
permission=crud_op,
|
permission=crud_op,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
desired_group_identifiers.add(group_identifier)
|
unique_user_group_identifiers.add(group_identifier)
|
||||||
|
|
||||||
|
if default_group is not None:
|
||||||
|
if user_model:
|
||||||
|
cls.associate_user_with_group(user_model, default_group)
|
||||||
|
else:
|
||||||
|
for user in UserModel.query.all():
|
||||||
|
cls.associate_user_with_group(user, default_group)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"group_identifiers": unique_user_group_identifiers,
|
||||||
|
"permission_assignments": permission_assignments,
|
||||||
|
"user_to_group_identifiers": user_to_group_identifiers,
|
||||||
|
}
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def refresh_permissions(cls, group_permissions: list[GroupPermissionsDict]) -> None:
|
||||||
|
"""Adds new permission assignments and deletes old ones."""
|
||||||
|
initial_permission_assignments = PermissionAssignmentModel.query.all()
|
||||||
|
initial_user_to_group_assignments = UserGroupAssignmentModel.query.all()
|
||||||
|
group_permissions = group_permissions + cls.parse_permissions_yaml_into_group_info()
|
||||||
|
|
||||||
|
result = cls.add_permissions_from_group_permissions(group_permissions)
|
||||||
|
desired_permission_assignments = result["permission_assignments"]
|
||||||
|
desired_group_identifiers = result["group_identifiers"]
|
||||||
|
desired_user_to_group_identifiers = result["user_to_group_identifiers"]
|
||||||
|
|
||||||
for ipa in initial_permission_assignments:
|
for ipa in initial_permission_assignments:
|
||||||
if ipa not in desired_permission_assignments:
|
if ipa not in desired_permission_assignments:
|
||||||
|
@ -801,10 +747,3 @@ class AuthorizationService:
|
||||||
for gtd in groups_to_delete:
|
for gtd in groups_to_delete:
|
||||||
db.session.delete(gtd)
|
db.session.delete(gtd)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
|
||||||
class KeycloakAuthorization:
|
|
||||||
"""Interface with Keycloak server."""
|
|
||||||
|
|
||||||
|
|
||||||
# class KeycloakClient:
|
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
"""Group_service."""
|
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from spiffworkflow_backend.models.db import db
|
from spiffworkflow_backend.models.db import db
|
||||||
|
@ -8,11 +7,8 @@ from spiffworkflow_backend.services.user_service import UserService
|
||||||
|
|
||||||
|
|
||||||
class GroupService:
|
class GroupService:
|
||||||
"""GroupService."""
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def find_or_create_group(cls, group_identifier: str) -> GroupModel:
|
def find_or_create_group(cls, group_identifier: str) -> GroupModel:
|
||||||
"""Find_or_create_group."""
|
|
||||||
group: Optional[GroupModel] = GroupModel.query.filter_by(identifier=group_identifier).first()
|
group: Optional[GroupModel] = GroupModel.query.filter_by(identifier=group_identifier).first()
|
||||||
if group is None:
|
if group is None:
|
||||||
group = GroupModel(identifier=group_identifier)
|
group = GroupModel(identifier=group_identifier)
|
||||||
|
@ -23,7 +19,6 @@ class GroupService:
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def add_user_to_group_or_add_to_waiting(cls, username: str, group_identifier: str) -> None:
|
def add_user_to_group_or_add_to_waiting(cls, username: str, group_identifier: str) -> None:
|
||||||
"""Add_user_to_group_or_add_to_waiting."""
|
|
||||||
group = cls.find_or_create_group(group_identifier)
|
group = cls.find_or_create_group(group_identifier)
|
||||||
user = UserModel.query.filter_by(username=username).first()
|
user = UserModel.query.filter_by(username=username).first()
|
||||||
if user:
|
if user:
|
||||||
|
|
|
@ -21,19 +21,10 @@ from spiffworkflow_backend.services.user_service import UserService
|
||||||
|
|
||||||
|
|
||||||
class TestAuthorizationService(BaseTest):
|
class TestAuthorizationService(BaseTest):
|
||||||
"""TestAuthorizationService."""
|
|
||||||
|
|
||||||
def test_can_raise_if_missing_user(self, app: Flask, with_db_and_bpmn_file_cleanup: None) -> None:
|
|
||||||
"""Test_can_raise_if_missing_user."""
|
|
||||||
with pytest.raises(UserNotFoundError):
|
|
||||||
AuthorizationService.import_permissions_from_yaml_file(raise_if_missing_user=True)
|
|
||||||
|
|
||||||
def test_does_not_fail_if_user_not_created(self, app: Flask, with_db_and_bpmn_file_cleanup: None) -> None:
|
def test_does_not_fail_if_user_not_created(self, app: Flask, with_db_and_bpmn_file_cleanup: None) -> None:
|
||||||
"""Test_does_not_fail_if_user_not_created."""
|
|
||||||
AuthorizationService.import_permissions_from_yaml_file()
|
AuthorizationService.import_permissions_from_yaml_file()
|
||||||
|
|
||||||
def test_can_import_permissions_from_yaml(self, app: Flask, with_db_and_bpmn_file_cleanup: None) -> None:
|
def test_can_import_permissions_from_yaml(self, app: Flask, with_db_and_bpmn_file_cleanup: None) -> None:
|
||||||
"""Test_can_import_permissions_from_yaml."""
|
|
||||||
usernames = [
|
usernames = [
|
||||||
"testadmin1",
|
"testadmin1",
|
||||||
"testadmin2",
|
"testadmin2",
|
||||||
|
@ -59,8 +50,6 @@ class TestAuthorizationService(BaseTest):
|
||||||
self.assert_user_has_permission(users["testuser1"], "update", "/v1.0/process-groups/finance/model1")
|
self.assert_user_has_permission(users["testuser1"], "update", "/v1.0/process-groups/finance/model1")
|
||||||
self.assert_user_has_permission(users["testuser1"], "update", "/v1.0/process-groups/finance/")
|
self.assert_user_has_permission(users["testuser1"], "update", "/v1.0/process-groups/finance/")
|
||||||
self.assert_user_has_permission(users["testuser1"], "update", "/v1.0/process-groups/", expected_result=False)
|
self.assert_user_has_permission(users["testuser1"], "update", "/v1.0/process-groups/", expected_result=False)
|
||||||
self.assert_user_has_permission(users["testuser4"], "update", "/v1.0/process-groups/finance/model1")
|
|
||||||
# via the user, not the group
|
|
||||||
self.assert_user_has_permission(users["testuser4"], "read", "/v1.0/process-groups/finance/model1")
|
self.assert_user_has_permission(users["testuser4"], "read", "/v1.0/process-groups/finance/model1")
|
||||||
self.assert_user_has_permission(users["testuser2"], "update", "/v1.0/process-groups/finance/model1")
|
self.assert_user_has_permission(users["testuser2"], "update", "/v1.0/process-groups/finance/model1")
|
||||||
self.assert_user_has_permission(users["testuser2"], "update", "/v1.0/process-groups/", expected_result=False)
|
self.assert_user_has_permission(users["testuser2"], "update", "/v1.0/process-groups/", expected_result=False)
|
||||||
|
@ -387,7 +376,6 @@ class TestAuthorizationService(BaseTest):
|
||||||
client: FlaskClient,
|
client: FlaskClient,
|
||||||
with_db_and_bpmn_file_cleanup: None,
|
with_db_and_bpmn_file_cleanup: None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test_can_refresh_permissions."""
|
|
||||||
user = self.find_or_create_user(username="user_one")
|
user = self.find_or_create_user(username="user_one")
|
||||||
user_two = self.find_or_create_user(username="user_two")
|
user_two = self.find_or_create_user(username="user_two")
|
||||||
admin_user = self.find_or_create_user(username="testadmin1")
|
admin_user = self.find_or_create_user(username="testadmin1")
|
||||||
|
@ -410,6 +398,11 @@ class TestAuthorizationService(BaseTest):
|
||||||
"name": "group_three",
|
"name": "group_three",
|
||||||
"permissions": [{"actions": ["create", "read"], "uri": "PG:hey2"}],
|
"permissions": [{"actions": ["create", "read"], "uri": "PG:hey2"}],
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"users": [],
|
||||||
|
"name": "everybody",
|
||||||
|
"permissions": [{"actions": ["read"], "uri": "PG:hey2everybody"}],
|
||||||
|
},
|
||||||
]
|
]
|
||||||
AuthorizationService.refresh_permissions(group_info)
|
AuthorizationService.refresh_permissions(group_info)
|
||||||
assert GroupModel.query.filter_by(identifier="group_two").first() is None
|
assert GroupModel.query.filter_by(identifier="group_two").first() is None
|
||||||
|
@ -418,6 +411,7 @@ class TestAuthorizationService(BaseTest):
|
||||||
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey")
|
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey")
|
||||||
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo")
|
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo")
|
||||||
self.assert_user_has_permission(user, "create", "/v1.0/process-groups/hey:yo")
|
self.assert_user_has_permission(user, "create", "/v1.0/process-groups/hey:yo")
|
||||||
|
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey2everybody:yo")
|
||||||
|
|
||||||
self.assert_user_has_permission(user_two, "read", "/v1.0/process-groups/hey")
|
self.assert_user_has_permission(user_two, "read", "/v1.0/process-groups/hey")
|
||||||
self.assert_user_has_permission(user_two, "read", "/v1.0/process-groups/hey:yo")
|
self.assert_user_has_permission(user_two, "read", "/v1.0/process-groups/hey:yo")
|
||||||
|
|
Loading…
Reference in New Issue