added method to import permissions from yml file w/ burnettk
This commit is contained in:
parent
22ba89ae4f
commit
e4ded8fc05
|
@ -1,8 +1,8 @@
|
|||
"""empty message
|
||||
|
||||
Revision ID: 88e30afd19ac
|
||||
Revision ID: 5f7d61fa371c
|
||||
Revises:
|
||||
Create Date: 2022-10-11 09:39:40.882490
|
||||
Create Date: 2022-10-11 14:45:41.213890
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
|
@ -10,7 +10,7 @@ import sqlalchemy as sa
|
|||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '88e30afd19ac'
|
||||
revision = '5f7d61fa371c'
|
||||
down_revision = None
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
@ -226,8 +226,8 @@ def upgrade():
|
|||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('principal_id', sa.Integer(), nullable=False),
|
||||
sa.Column('permission_target_id', sa.Integer(), nullable=False),
|
||||
sa.Column('grant_type', sa.Enum('permit', 'deny', name='permitdeny'), nullable=True),
|
||||
sa.Column('permission', sa.Enum('create', 'read', 'update', 'delete', 'list', 'instantiate', name='permission'), nullable=True),
|
||||
sa.Column('grant_type', sa.String(length=50), nullable=True),
|
||||
sa.Column('permission', sa.String(length=50), nullable=True),
|
||||
sa.ForeignKeyConstraint(['permission_target_id'], ['permission_target.id'], ),
|
||||
sa.ForeignKeyConstraint(['principal_id'], ['principal.id'], ),
|
||||
sa.PrimaryKeyConstraint('id'),
|
32
perms.yml
32
perms.yml
|
@ -1,32 +0,0 @@
|
|||
group-admin:
|
||||
type: Group
|
||||
users: [jakub, kb, alex, dan, mike, jason]
|
||||
|
||||
group-finance:
|
||||
type: Group
|
||||
users: [harmeet, sasha]
|
||||
|
||||
group-hr:
|
||||
type: Group
|
||||
users: [manuchehr]
|
||||
|
||||
permission-admin:
|
||||
type: Permission
|
||||
groups: [group-admin]
|
||||
users: []
|
||||
allowed_permissions: [CREATE, READ, UPDATE, DELETE, LIST, INSTANTIATE]
|
||||
uri: /*
|
||||
|
||||
permission-finance-admin:
|
||||
type: Permission
|
||||
groups: [group-a]
|
||||
users: []
|
||||
allowed_permissions: [CREATE, READ, UPDATE, DELETE]
|
||||
uri: /v1.0/process-groups/finance/*
|
||||
|
||||
permission-read-all:
|
||||
type: Permission
|
||||
groups: [group-finance, group-hr, group-admin]
|
||||
users: []
|
||||
allowed_permissions: [READ]
|
||||
uri: /*
|
|
@ -1866,7 +1866,7 @@ pytz = "*"
|
|||
type = "git"
|
||||
url = "https://github.com/sartography/SpiffWorkflow"
|
||||
reference = "main"
|
||||
resolved_reference = "f30221dc6bb52753952369fc3bc5ee55ceef762f"
|
||||
resolved_reference = "63db3e45947ec66b8d0efc2c74064004f8ff482c"
|
||||
|
||||
[[package]]
|
||||
name = "SQLAlchemy"
|
||||
|
@ -1986,6 +1986,14 @@ category = "main"
|
|||
optional = false
|
||||
python-versions = "*"
|
||||
|
||||
[[package]]
|
||||
name = "types-PyYAML"
|
||||
version = "6.0.12"
|
||||
description = "Typing stubs for PyYAML"
|
||||
category = "main"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
|
||||
[[package]]
|
||||
name = "types-requests"
|
||||
version = "2.28.11.1"
|
||||
|
@ -2178,7 +2186,7 @@ testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>=
|
|||
[metadata]
|
||||
lock-version = "1.1"
|
||||
python-versions = ">=3.9,<3.11"
|
||||
content-hash = "5a1aa6f7788fe750e6c4f5681b71d7a8269806da061be6565746e3a23203196f"
|
||||
content-hash = "d05da4b7176e705f6962576bd25332a2514b94eac99b48f19bf950f073a2ddad"
|
||||
|
||||
[metadata.files]
|
||||
alabaster = [
|
||||
|
@ -3396,6 +3404,10 @@ types-pytz = [
|
|||
{file = "types-pytz-2022.4.0.0.tar.gz", hash = "sha256:17d66e4b16e80ceae0787726f3a22288df7d3f9fdebeb091dc64b92c0e4ea09d"},
|
||||
{file = "types_pytz-2022.4.0.0-py3-none-any.whl", hash = "sha256:950b0f3d64ed5b03a3e29c1e38fe2be8371c933c8e97922d0352345336eb8af4"},
|
||||
]
|
||||
types-PyYAML = [
|
||||
{file = "types-PyYAML-6.0.12.tar.gz", hash = "sha256:f6f350418125872f3f0409d96a62a5a5ceb45231af5cc07ee0034ec48a3c82fa"},
|
||||
{file = "types_PyYAML-6.0.12-py3-none-any.whl", hash = "sha256:29228db9f82df4f1b7febee06bbfb601677882e98a3da98132e31c6874163e15"},
|
||||
]
|
||||
types-requests = [
|
||||
{file = "types-requests-2.28.11.1.tar.gz", hash = "sha256:02b1806c5b9904edcd87fa29236164aea0e6cdc4d93ea020cd615ef65cb43d65"},
|
||||
{file = "types_requests-2.28.11.1-py3-none-any.whl", hash = "sha256:1ff2c1301f6fe58b5d1c66cdf631ca19734cb3b1a4bbadc878d75557d183291a"},
|
||||
|
|
|
@ -55,6 +55,7 @@ Jinja2 = "^3.1.2"
|
|||
RestrictedPython = "^5.2"
|
||||
Flask-SQLAlchemy = "^3"
|
||||
orjson = "^3.8.0"
|
||||
types-PyYAML = "^6.0.12"
|
||||
|
||||
|
||||
[tool.poetry.dev-dependencies]
|
||||
|
|
|
@ -57,6 +57,15 @@ def setup_config(app: Flask) -> None:
|
|||
setup_database_uri(app)
|
||||
setup_logger(app)
|
||||
|
||||
app.config["PERMISSIONS_FILE_FULLPATH"] = None
|
||||
if app.config["SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME"]:
|
||||
app.config["PERMISSIONS_FILE_FULLPATH"] = os.path.join(
|
||||
app.root_path,
|
||||
"config",
|
||||
"permissions",
|
||||
app.config["SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME"],
|
||||
)
|
||||
|
||||
env_config_module = "spiffworkflow_backend.config." + app.config["ENV_IDENTIFIER"]
|
||||
try:
|
||||
app.config.from_object(env_config_module)
|
||||
|
|
|
@ -41,3 +41,7 @@ SPIFFWORKFLOW_BACKEND_LOG_TO_FILE = (
|
|||
CONNECTOR_PROXY_URL = environ.get(
|
||||
"CONNECTOR_PROXY_URL", default="http://localhost:7004"
|
||||
)
|
||||
|
||||
SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME = environ.get(
|
||||
"SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME"
|
||||
)
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
groups:
|
||||
admin:
|
||||
users: [jakub, kb, alex, dan, mike, jason]
|
||||
|
||||
finance:
|
||||
users: [harmeet, sasha]
|
||||
|
||||
hr:
|
||||
users: [manuchehr]
|
||||
|
||||
permissions:
|
||||
admin:
|
||||
groups: [admin]
|
||||
users: []
|
||||
allowed_permissions: [CREATE, READ, UPDATE, DELETE, LIST, INSTANTIATE]
|
||||
uri: /*
|
||||
|
||||
finance-admin:
|
||||
groups: [finance]
|
||||
users: []
|
||||
allowed_permissions: [CREATE, READ, UPDATE, DELETE]
|
||||
uri: /v1.0/process-groups/finance/*
|
||||
|
||||
read-all:
|
||||
groups: [finance, hr, admin]
|
||||
users: []
|
||||
allowed_permissions: [READ]
|
||||
uri: /*
|
|
@ -0,0 +1,28 @@
|
|||
groups:
|
||||
admin:
|
||||
users: [testadmin1, testadmin2]
|
||||
|
||||
finance:
|
||||
users: [testuser1, testuser2]
|
||||
|
||||
hr:
|
||||
users: [testuser2, testuser3, testuser4]
|
||||
|
||||
permissions:
|
||||
admin:
|
||||
groups: [admin]
|
||||
users: []
|
||||
allowed_permissions: [create, read, update, delete, list, instantiate]
|
||||
uri: /*
|
||||
|
||||
finance-admin:
|
||||
groups: [finance]
|
||||
users: [testuser4]
|
||||
allowed_permissions: [create, read, update, delete]
|
||||
uri: /v1.0/process-groups/finance/*
|
||||
|
||||
read-all:
|
||||
groups: [finance, hr, admin]
|
||||
users: []
|
||||
allowed_permissions: [read]
|
||||
uri: /*
|
|
@ -7,3 +7,7 @@ SECRET_KEY = "the_secret_key"
|
|||
SPIFFWORKFLOW_BACKEND_LOG_TO_FILE = (
|
||||
environ.get("SPIFFWORKFLOW_BACKEND_LOG_TO_FILE", default="true") == "true"
|
||||
)
|
||||
|
||||
SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME = environ.get(
|
||||
"SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME", default="testing.yml"
|
||||
)
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
"""PermissionAssignment."""
|
||||
import enum
|
||||
from typing import Any
|
||||
|
||||
from flask_bpmn.models.db import db
|
||||
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
|
||||
from sqlalchemy import Enum
|
||||
from sqlalchemy import ForeignKey
|
||||
from sqlalchemy.orm import validates
|
||||
|
||||
from spiffworkflow_backend.models.permission_target import PermissionTargetModel
|
||||
from spiffworkflow_backend.models.principal import PrincipalModel
|
||||
|
@ -26,12 +27,12 @@ class Permission(enum.Enum):
|
|||
# administer = 2
|
||||
# view_instance = 3
|
||||
|
||||
create = 1
|
||||
read = 2
|
||||
update = 3
|
||||
delete = 4
|
||||
list = 5
|
||||
instantiate = 6 # this is something you do to a process model
|
||||
create = "create"
|
||||
read = "read"
|
||||
update = "update"
|
||||
delete = "delete"
|
||||
list = "list"
|
||||
instantiate = "instantiate" # this is something you do to a process model
|
||||
|
||||
|
||||
class PermissionAssignmentModel(SpiffworkflowBaseDBModel):
|
||||
|
@ -51,5 +52,15 @@ class PermissionAssignmentModel(SpiffworkflowBaseDBModel):
|
|||
permission_target_id = db.Column(
|
||||
ForeignKey(PermissionTargetModel.id), nullable=False
|
||||
)
|
||||
grant_type = db.Column(Enum(PermitDeny))
|
||||
permission = db.Column(Enum(Permission))
|
||||
grant_type = db.Column(db.String(50))
|
||||
permission = db.Column(db.String(50))
|
||||
|
||||
@validates("grant_type")
|
||||
def validate_grant_type(self, key: str, value: str) -> Any:
|
||||
"""Validate_grant_type."""
|
||||
return self.validate_enum_field(key, value, PermitDeny)
|
||||
|
||||
@validates("permission")
|
||||
def validate_permission(self, key: str, value: str) -> Any:
|
||||
"""Validate_permission."""
|
||||
return self.validate_enum_field(key, value, Permission)
|
||||
|
|
|
@ -1,13 +1,14 @@
|
|||
"""PermissionTarget."""
|
||||
import re
|
||||
from typing import Any
|
||||
from sqlalchemy.orm import validates
|
||||
|
||||
from flask_bpmn.models.db import db
|
||||
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
|
||||
from sqlalchemy.orm import validates
|
||||
|
||||
|
||||
class InvalidPermissionTargetUri(Exception):
|
||||
pass
|
||||
class InvalidPermissionTargetUriError(Exception):
|
||||
"""InvalidPermissionTargetUriError."""
|
||||
|
||||
|
||||
class PermissionTargetModel(SpiffworkflowBaseDBModel):
|
||||
"""PermissionTargetModel."""
|
||||
|
@ -19,8 +20,9 @@ class PermissionTargetModel(SpiffworkflowBaseDBModel):
|
|||
|
||||
@validates("uri")
|
||||
def validate_uri(self, key: str, value: str) -> str:
|
||||
"""Validate_uri."""
|
||||
if re.search(r"%.", value):
|
||||
raise InvalidPermissionTargetUri(
|
||||
f"Invalid Permission Target Uri: {value}"
|
||||
raise InvalidPermissionTargetUriError(
|
||||
f"Wildcard must appear at end: {value}"
|
||||
)
|
||||
return value
|
||||
|
|
|
@ -17,6 +17,10 @@ from spiffworkflow_backend.services.authentication_service import (
|
|||
)
|
||||
|
||||
|
||||
class UserNotFoundError(Exception):
|
||||
"""UserNotFoundError."""
|
||||
|
||||
|
||||
class UserModel(SpiffworkflowBaseDBModel):
|
||||
"""UserModel."""
|
||||
|
||||
|
|
|
@ -1,18 +1,24 @@
|
|||
"""Authorization_service."""
|
||||
import re
|
||||
from typing import Any
|
||||
from typing import Union
|
||||
|
||||
from sqlalchemy import text
|
||||
|
||||
import jwt
|
||||
import yaml
|
||||
from flask import current_app
|
||||
from flask_bpmn.api.api_error import ApiError
|
||||
from flask_bpmn.models.db import db
|
||||
from sqlalchemy import text
|
||||
|
||||
from spiffworkflow_backend.models.group import GroupModel
|
||||
from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
|
||||
from spiffworkflow_backend.models.permission_target import PermissionTargetModel
|
||||
from spiffworkflow_backend.models.principal import MissingPrincipalError
|
||||
from spiffworkflow_backend.models.principal import PrincipalModel
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
from spiffworkflow_backend.models.user import UserNotFoundError
|
||||
from spiffworkflow_backend.models.user_group_assignment import UserGroupAssignmentModel
|
||||
from spiffworkflow_backend.services.user_service import UserService
|
||||
|
||||
|
||||
class AuthorizationService:
|
||||
|
@ -36,9 +42,9 @@ class AuthorizationService:
|
|||
)
|
||||
|
||||
for permission_assignment in permission_assignments:
|
||||
if permission_assignment.grant_type.value == "permit":
|
||||
if permission_assignment.grant_type == "permit":
|
||||
return True
|
||||
elif permission_assignment.grant_type.value == "deny":
|
||||
elif permission_assignment.grant_type == "deny":
|
||||
return False
|
||||
else:
|
||||
raise Exception("Unknown grant type")
|
||||
|
@ -66,6 +72,99 @@ class AuthorizationService:
|
|||
|
||||
return cls.has_permission(principals, permission, target_uri)
|
||||
|
||||
@classmethod
|
||||
def import_permissions_from_yaml_file(
|
||||
cls, raise_if_missing_user: bool = False
|
||||
) -> None:
|
||||
"""Import_permissions_from_yaml_file."""
|
||||
permission_configs = None
|
||||
with open(current_app.config["PERMISSIONS_FILE_FULLPATH"]) as file:
|
||||
permission_configs = yaml.safe_load(file)
|
||||
|
||||
if "groups" in permission_configs:
|
||||
for group_identifier, group_config in permission_configs["groups"].items():
|
||||
group = GroupModel.query.filter_by(identifier=group_identifier).first()
|
||||
if group is None:
|
||||
group = GroupModel(identifier=group_identifier)
|
||||
db.session.add(group)
|
||||
db.session.commit()
|
||||
UserService.create_principal(group.id, id_column_name="group_id")
|
||||
for username in group_config["users"]:
|
||||
user = UserModel.query.filter_by(username=username).first()
|
||||
if user is None:
|
||||
if raise_if_missing_user:
|
||||
raise (
|
||||
UserNotFoundError(
|
||||
f"Could not find a user with name: {username}"
|
||||
)
|
||||
)
|
||||
continue
|
||||
user_group_assignemnt = UserGroupAssignmentModel(
|
||||
user_id=user.id, group_id=group.id
|
||||
)
|
||||
db.session.add(user_group_assignemnt)
|
||||
db.session.commit()
|
||||
|
||||
if "permissions" in permission_configs:
|
||||
for _permission_identifier, permission_config in permission_configs[
|
||||
"permissions"
|
||||
].items():
|
||||
uri = permission_config["uri"]
|
||||
uri_with_percent = re.sub(r"\*", "%", uri)
|
||||
permission_target = PermissionTargetModel.query.filter_by(
|
||||
uri=uri_with_percent
|
||||
).first()
|
||||
if permission_target is None:
|
||||
permission_target = PermissionTargetModel(uri=uri_with_percent)
|
||||
db.session.add(permission_target)
|
||||
db.session.commit()
|
||||
|
||||
for allowed_permission in permission_config["allowed_permissions"]:
|
||||
if "groups" in permission_config:
|
||||
for group_identifier in permission_config["groups"]:
|
||||
principal = (
|
||||
PrincipalModel.query.join(GroupModel)
|
||||
.filter(GroupModel.identifier == group_identifier)
|
||||
.first()
|
||||
)
|
||||
cls.create_permission_for_principal(
|
||||
principal, permission_target, allowed_permission
|
||||
)
|
||||
if "users" in permission_config:
|
||||
for username in permission_config["users"]:
|
||||
principal = (
|
||||
PrincipalModel.query.join(UserModel)
|
||||
.filter(UserModel.username == username)
|
||||
.first()
|
||||
)
|
||||
cls.create_permission_for_principal(
|
||||
principal, permission_target, allowed_permission
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def create_permission_for_principal(
|
||||
cls,
|
||||
principal: PrincipalModel,
|
||||
permission_target: PermissionTargetModel,
|
||||
permission: str,
|
||||
) -> PermissionAssignmentModel | Any:
|
||||
"""Create_permission_for_principal."""
|
||||
permission_assignment = PermissionAssignmentModel.query.filter_by(
|
||||
principal_id=principal.id,
|
||||
permission_target_id=permission_target.id,
|
||||
permission=permission,
|
||||
).first()
|
||||
if permission_assignment is None:
|
||||
permission_assignment = PermissionAssignmentModel(
|
||||
principal_id=principal.id,
|
||||
permission_target_id=permission_target.id,
|
||||
permission=permission,
|
||||
grant_type="permit",
|
||||
)
|
||||
db.session.add(permission_assignment)
|
||||
db.session.commit()
|
||||
return permission_assignment
|
||||
|
||||
# def refresh_token(self, token: str) -> str:
|
||||
# """Refresh_token."""
|
||||
# # if isinstance(token, str):
|
||||
|
|
|
@ -270,13 +270,17 @@ class UserService:
|
|||
)
|
||||
|
||||
@classmethod
|
||||
def create_principal(cls, user_id: int) -> PrincipalModel:
|
||||
def create_principal(
|
||||
cls, child_id: int, id_column_name: str = "user_id"
|
||||
) -> PrincipalModel:
|
||||
"""Create_principal."""
|
||||
principal: Optional[PrincipalModel] = PrincipalModel.query.filter_by(
|
||||
user_id=user_id
|
||||
column = PrincipalModel.__table__.columns[id_column_name]
|
||||
principal: Optional[PrincipalModel] = PrincipalModel.query.filter(
|
||||
column == child_id
|
||||
).first()
|
||||
if principal is None:
|
||||
principal = PrincipalModel(user_id=user_id)
|
||||
principal = PrincipalModel()
|
||||
setattr(principal, id_column_name, child_id)
|
||||
db.session.add(principal)
|
||||
try:
|
||||
db.session.commit()
|
||||
|
@ -285,7 +289,7 @@ class UserService:
|
|||
current_app.logger.error(f"Exception in create_principal: {e}")
|
||||
raise ApiError(
|
||||
error_code="add_principal_error",
|
||||
message=f"Could not create principal {user_id}",
|
||||
message=f"Could not create principal {child_id}",
|
||||
) from e
|
||||
return principal
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ from spiffworkflow_backend.models.process_model import NotificationType
|
|||
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
||||
from spiffworkflow_backend.models.process_model import ProcessModelInfoSchema
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
from spiffworkflow_backend.services.authorization_service import AuthorizationService
|
||||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||
from spiffworkflow_backend.services.user_service import UserService
|
||||
|
||||
|
@ -262,3 +263,18 @@ class BaseTest:
|
|||
)
|
||||
with open(file_full_path, "rb") as file:
|
||||
return file.read()
|
||||
|
||||
def assert_user_has_permission(
|
||||
self,
|
||||
user: UserModel,
|
||||
permission: str,
|
||||
target_uri: str,
|
||||
expected_result: bool = True,
|
||||
) -> None:
|
||||
"""Assert_user_has_permission."""
|
||||
has_permission = AuthorizationService.user_has_permission(
|
||||
user=user,
|
||||
permission=permission,
|
||||
target_uri=target_uri,
|
||||
)
|
||||
assert has_permission is expected_result
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
"""Test_message_service."""
|
||||
|
||||
import pytest
|
||||
from flask import Flask
|
||||
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||
from spiffworkflow_backend.models.user import UserNotFoundError
|
||||
|
||||
from spiffworkflow_backend.services.authorization_service import AuthorizationService
|
||||
|
||||
|
||||
class TestAuthorizationService(BaseTest):
|
||||
"""TestAuthorizationService."""
|
||||
|
||||
def test_can_raise_if_missing_user(
|
||||
self, app: Flask, with_db_and_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_can_raise_if_missing_user."""
|
||||
with pytest.raises(UserNotFoundError):
|
||||
AuthorizationService.import_permissions_from_yaml_file(raise_if_missing_user=True)
|
||||
|
||||
def test_can_import_permissions_from_yaml(
|
||||
self, app: Flask, with_db_and_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_can_import_permissions_from_yaml."""
|
||||
usernames = ['testadmin1', 'testadmin2', 'testuser1', 'testuser2', 'testuser3', 'testuser4']
|
||||
users = {}
|
||||
for username in usernames:
|
||||
user = self.find_or_create_user(username=username)
|
||||
users[username] = user
|
||||
|
||||
AuthorizationService.import_permissions_from_yaml_file()
|
||||
assert len(users['testadmin1'].groups) == 1
|
||||
assert users['testadmin1'].groups[0].identifier == 'admin'
|
||||
assert len(users['testuser1'].groups) == 1
|
||||
assert users['testuser1'].groups[0].identifier == 'finance'
|
||||
assert len(users['testuser2'].groups) == 2
|
||||
|
||||
self.assert_user_has_permission(users['testuser1'], 'update', "/v1.0/process-groups/finance/model1")
|
||||
self.assert_user_has_permission(users['testuser1'], 'update', "/v1.0/process-groups/finance/")
|
||||
self.assert_user_has_permission(users['testuser1'], 'update', "/v1.0/process-groups/", expected_result=False)
|
||||
self.assert_user_has_permission(users['testuser4'], 'update', "/v1.0/process-groups/finance/model1")
|
||||
self.assert_user_has_permission(users['testuser2'], 'update', "/v1.0/process-groups/finance/model1")
|
||||
self.assert_user_has_permission(users['testuser2'], 'update', "/v1.0/process-groups/", expected_result=False)
|
||||
self.assert_user_has_permission(users['testuser2'], 'read', "/v1.0/process-groups/")
|
|
@ -4,14 +4,19 @@ from flask.app import Flask
|
|||
from flask_bpmn.models.db import db
|
||||
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||
|
||||
from spiffworkflow_backend.models.permission_target import InvalidPermissionTargetUri, PermissionTargetModel
|
||||
from spiffworkflow_backend.models.permission_target import (
|
||||
InvalidPermissionTargetUriError,
|
||||
)
|
||||
from spiffworkflow_backend.models.permission_target import PermissionTargetModel
|
||||
|
||||
|
||||
class TestPermissionTarget(BaseTest):
|
||||
"""TestPermissionTarget."""
|
||||
|
||||
def test_asterisk_must_go_at_the_end_of_uri(
|
||||
self, app: Flask, with_db_and_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_asterisk_must_go_at_the_end_of_uri."""
|
||||
permission_target = PermissionTargetModel(uri="/test_group/%")
|
||||
db.session.add(permission_target)
|
||||
db.session.commit()
|
||||
|
@ -20,8 +25,8 @@ class TestPermissionTarget(BaseTest):
|
|||
db.session.add(permission_target)
|
||||
db.session.commit()
|
||||
|
||||
with pytest.raises(InvalidPermissionTargetUri) as exception:
|
||||
with pytest.raises(InvalidPermissionTargetUriError) as exception:
|
||||
PermissionTargetModel(uri="/test_group/%/model")
|
||||
assert (
|
||||
str(exception.value) == "Invalid Permission Target Uri: /test_group/%/model"
|
||||
str(exception.value) == "Wildcard must appear at end: /test_group/%/model"
|
||||
)
|
||||
|
|
|
@ -8,7 +8,6 @@ from spiffworkflow_backend.models.group import GroupModel
|
|||
from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
|
||||
from spiffworkflow_backend.models.permission_target import PermissionTargetModel
|
||||
from spiffworkflow_backend.models.principal import PrincipalModel
|
||||
from spiffworkflow_backend.services.authorization_service import AuthorizationService
|
||||
from spiffworkflow_backend.services.user_service import UserService
|
||||
|
||||
|
||||
|
@ -74,22 +73,17 @@ class TestPermissions(BaseTest):
|
|||
db.session.add(permission_assignment)
|
||||
db.session.commit()
|
||||
|
||||
has_permission_to_a = AuthorizationService.user_has_permission(
|
||||
user=group_a_admin,
|
||||
permission="update",
|
||||
target_uri=f"/{process_group_a_id}",
|
||||
self.assert_user_has_permission(
|
||||
group_a_admin, "update", f"/{process_group_a_id}"
|
||||
)
|
||||
assert has_permission_to_a is True
|
||||
has_permission_to_b = AuthorizationService.user_has_permission(
|
||||
user=group_a_admin,
|
||||
permission="update",
|
||||
target_uri=f"/{process_group_b_id}",
|
||||
self.assert_user_has_permission(
|
||||
group_a_admin, "update", f"/{process_group_b_id}", expected_result=False
|
||||
)
|
||||
assert has_permission_to_b is False
|
||||
|
||||
def test_user_can_be_granted_access_through_a_group(
|
||||
self, app: Flask, with_db_and_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_user_can_be_granted_access_through_a_group."""
|
||||
process_group_ids = ["group-a", "group-b"]
|
||||
process_group_a_id = process_group_ids[0]
|
||||
process_group_ids[1]
|
||||
|
@ -122,16 +116,12 @@ class TestPermissions(BaseTest):
|
|||
db.session.add(permission_assignment)
|
||||
db.session.commit()
|
||||
|
||||
has_permission_to_a = AuthorizationService.user_has_permission(
|
||||
user=user,
|
||||
permission="update",
|
||||
target_uri=f"/{process_group_a_id}",
|
||||
)
|
||||
assert has_permission_to_a is True
|
||||
self.assert_user_has_permission(user, "update", f"/{process_group_a_id}")
|
||||
|
||||
def test_user_can_be_read_models_with_global_permission(
|
||||
self, app: Flask, with_db_and_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_user_can_be_read_models_with_global_permission."""
|
||||
process_group_ids = ["group-a", "group-b"]
|
||||
process_group_a_id = process_group_ids[0]
|
||||
process_group_b_id = process_group_ids[1]
|
||||
|
@ -142,7 +132,7 @@ class TestPermissions(BaseTest):
|
|||
)
|
||||
group_a_admin = self.find_or_create_user()
|
||||
|
||||
permission_target = PermissionTargetModel(uri=f"/%")
|
||||
permission_target = PermissionTargetModel(uri="/%")
|
||||
db.session.add(permission_target)
|
||||
db.session.commit()
|
||||
|
||||
|
@ -155,15 +145,9 @@ class TestPermissions(BaseTest):
|
|||
db.session.add(permission_assignment)
|
||||
db.session.commit()
|
||||
|
||||
has_permission_to_a = AuthorizationService.user_has_permission(
|
||||
user=group_a_admin,
|
||||
permission="update",
|
||||
target_uri=f"/{process_group_a_id}",
|
||||
self.assert_user_has_permission(
|
||||
group_a_admin, "update", f"/{process_group_a_id}"
|
||||
)
|
||||
assert has_permission_to_a is True
|
||||
has_permission_to_b = AuthorizationService.user_has_permission(
|
||||
user=group_a_admin,
|
||||
permission="update",
|
||||
target_uri=f"/{process_group_b_id}",
|
||||
self.assert_user_has_permission(
|
||||
group_a_admin, "update", f"/{process_group_b_id}"
|
||||
)
|
||||
assert has_permission_to_b is True
|
||||
|
|
Loading…
Reference in New Issue