attempting to see if sql like statement works in other dbs as well w/ burnettk

This commit is contained in:
jasquat 2022-10-11 11:43:52 -04:00
parent 50fadc55e7
commit 621ad3ef2e
4 changed files with 85 additions and 2 deletions

View File

@ -1,4 +1,7 @@
"""PermissionTarget."""
import re
from typing import Any
from sqlalchemy.orm import validates
from flask_bpmn.models.db import db
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
@ -9,6 +12,9 @@ from flask_bpmn.models.db import SpiffworkflowBaseDBModel
# from spiffworkflow_backend.models.process_model import ProcessModel
class InvalidPermissionTargetUri(Exception):
pass
class PermissionTargetModel(SpiffworkflowBaseDBModel):
"""PermissionTargetModel."""
@ -24,3 +30,11 @@ class PermissionTargetModel(SpiffworkflowBaseDBModel):
# process_group_id = db.Column(ForeignKey(ProcessGroupModel.id), nullable=True) # type: ignore
# process_model_identifier = db.Column(ForeignKey(ProcessModel.id), nullable=True) # type: ignore
# process_instance_id = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=True) # type: ignore
@validates("uri")
def validate_uri(self, key: str, value: str) -> str:
if re.search(r"\*.", value):
raise InvalidPermissionTargetUri(
f"Invalid Permission Target Uri: {value}"
)
return value

View File

@ -1,6 +1,9 @@
"""Authorization_service."""
import re
from typing import Union
from sqlalchemy import text
import jwt
from flask import current_app
from flask_bpmn.api.api_error import ApiError
@ -21,13 +24,14 @@ class AuthorizationService:
) -> bool:
"""Has_permission."""
principal_ids = [p.id for p in principals]
permission_assignments = (
PermissionAssignmentModel.query.filter(
PermissionAssignmentModel.principal_id.in_(principal_ids)
)
.filter_by(permission=permission)
.join(PermissionTargetModel)
.filter_by(uri=target_uri)
.filter(text(f"'{target_uri}' LIKE `permission_target`.`uri`"))
.all()
)

View File

@ -0,0 +1,27 @@
"""Process Model."""
import pytest
from flask.app import Flask
from flask_bpmn.models.db import db
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.permission_target import InvalidPermissionTargetUri, PermissionTargetModel
class TestPermissionTarget(BaseTest):
def test_asterisk_must_go_at_the_end_of_uri(
self, app: Flask, with_db_and_bpmn_file_cleanup: None
) -> None:
permission_target = PermissionTargetModel(uri="/test_group/*")
db.session.add(permission_target)
db.session.commit()
permission_target = PermissionTargetModel(uri="/test_group")
db.session.add(permission_target)
db.session.commit()
with pytest.raises(InvalidPermissionTargetUri) as exception:
PermissionTargetModel(uri="/test_group/*/model")
assert (
str(exception.value) == "Invalid Permission Target Uri: /test_group/*/model"
)

View File

@ -90,7 +90,6 @@ class TestPermissions(BaseTest):
def test_user_can_be_granted_access_through_a_group(
self, app: Flask, with_db_and_bpmn_file_cleanup: None
) -> None:
"""Test_group_a_admin_needs_to_stay_away_from_group_b."""
process_group_ids = ["group-a", "group-b"]
process_group_a_id = process_group_ids[0]
process_group_ids[1]
@ -129,3 +128,42 @@ class TestPermissions(BaseTest):
target_uri=f"/{process_group_a_id}",
)
assert has_permission_to_a is True
def test_user_can_be_read_models_with_global_permission(
self, app: Flask, with_db_and_bpmn_file_cleanup: None
) -> None:
process_group_ids = ["group-a", "group-b"]
process_group_a_id = process_group_ids[0]
process_group_b_id = process_group_ids[1]
for process_group_id in process_group_ids:
load_test_spec(
"timers_intermediate_catch_event",
process_group_id=process_group_id,
)
group_a_admin = self.find_or_create_user()
permission_target = PermissionTargetModel(uri=f"/%")
db.session.add(permission_target)
db.session.commit()
permission_assignment = PermissionAssignmentModel(
permission_target_id=permission_target.id,
principal_id=group_a_admin.principal.id,
permission="update",
grant_type="permit",
)
db.session.add(permission_assignment)
db.session.commit()
has_permission_to_a = AuthorizationService.user_has_permission(
user=group_a_admin,
permission="update",
target_uri=f"/{process_group_a_id}",
)
assert has_permission_to_a is True
has_permission_to_b = AuthorizationService.user_has_permission(
user=group_a_admin,
permission="update",
target_uri=f"/{process_group_b_id}",
)
assert has_permission_to_b is True