pyl passes w/ burnettk

This commit is contained in:
jasquat 2022-12-22 16:42:52 -05:00
parent 9234af5e34
commit 197a823220
10 changed files with 76 additions and 320 deletions

View File

@ -12,9 +12,6 @@ from spiffworkflow_backend.models.script_attributes_context import (
from spiffworkflow_backend.scripts.script import Script from spiffworkflow_backend.scripts.script import Script
# add_permission("read", "test/*", "Editors")
class GetAllPermissions(Script): class GetAllPermissions(Script):
"""GetAllPermissions.""" """GetAllPermissions."""

View File

@ -7,10 +7,9 @@ from spiffworkflow_backend.models.script_attributes_context import (
from spiffworkflow_backend.scripts.script import Script from spiffworkflow_backend.scripts.script import Script
from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.authorization_service import AuthorizationService
# add_permission("read", "test/*", "Editors")
class RecreatePermissions(Script): class RecreatePermissions(Script):
"""RecreatePermissions."""
def get_description(self) -> str: def get_description(self) -> str:
"""Get_description.""" """Get_description."""

View File

@ -1,13 +1,14 @@
"""Authorization_service.""" """Authorization_service."""
import inspect import inspect
from typing import TypedDict
from typing import Any, Set
import re import re
from dataclasses import dataclass from dataclasses import dataclass
from hashlib import sha256 from hashlib import sha256
from hmac import compare_digest from hmac import compare_digest
from hmac import HMAC from hmac import HMAC
from typing import Any
from typing import Optional from typing import Optional
from typing import Set
from typing import TypedDict
from typing import Union from typing import Union
import jwt import jwt
@ -23,7 +24,6 @@ from sqlalchemy import or_
from sqlalchemy import text from sqlalchemy import text
from spiffworkflow_backend.helpers.api_version import V1_API_PATH_PREFIX from spiffworkflow_backend.helpers.api_version import V1_API_PATH_PREFIX
from spiffworkflow_backend.models import permission_assignment
from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.human_task import HumanTaskModel from spiffworkflow_backend.models.human_task import HumanTaskModel
from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
@ -72,6 +72,8 @@ PATH_SEGMENTS_FOR_PERMISSION_ALL = [
class DesiredPermissionDict(TypedDict): class DesiredPermissionDict(TypedDict):
"""DesiredPermissionDict."""
group_identifiers: Set[str] group_identifiers: Set[str]
permission_assignments: list[PermissionAssignmentModel] permission_assignments: list[PermissionAssignmentModel]
@ -236,9 +238,13 @@ class AuthorizationService:
for group_identifier in permission_config["groups"]: for group_identifier in permission_config["groups"]:
group = GroupService.find_or_create_group(group_identifier) group = GroupService.find_or_create_group(group_identifier)
unique_user_group_identifiers.add(group_identifier) unique_user_group_identifiers.add(group_identifier)
permission_assignments.append(cls.create_permission_for_principal( permission_assignments.append(
group.principal, permission_target, allowed_permission cls.create_permission_for_principal(
)) group.principal,
permission_target,
allowed_permission,
)
)
if "users" in permission_config: if "users" in permission_config:
for username in permission_config["users"]: for username in permission_config["users"]:
user = UserModel.query.filter_by(username=username).first() user = UserModel.query.filter_by(username=username).first()
@ -248,15 +254,20 @@ class AuthorizationService:
.filter(UserModel.username == username) .filter(UserModel.username == username)
.first() .first()
) )
permission_assignments.append(cls.create_permission_for_principal( permission_assignments.append(
cls.create_permission_for_principal(
principal, permission_target, allowed_permission principal, permission_target, allowed_permission
)) )
)
if default_group is not None: if default_group is not None:
for user in UserModel.query.all(): for user in UserModel.query.all():
cls.associate_user_with_group(user, default_group) cls.associate_user_with_group(user, default_group)
return { 'group_identifiers': unique_user_group_identifiers, 'permission_assignments': permission_assignments } return {
"group_identifiers": unique_user_group_identifiers,
"permission_assignments": permission_assignments,
}
@classmethod @classmethod
def find_or_create_permission_target(cls, uri: str) -> PermissionTargetModel: def find_or_create_permission_target(cls, uri: str) -> PermissionTargetModel:
@ -715,9 +726,11 @@ class AuthorizationService:
permission_target = cls.find_or_create_permission_target( permission_target = cls.find_or_create_permission_target(
permission_to_assign.target_uri permission_to_assign.target_uri
) )
permission_assignments.append(cls.create_permission_for_principal( permission_assignments.append(
cls.create_permission_for_principal(
group.principal, permission_target, permission_to_assign.permission group.principal, permission_target, permission_to_assign.permission
)) )
)
return permission_assignments return permission_assignments
@classmethod @classmethod
@ -725,24 +738,32 @@ class AuthorizationService:
"""Adds new permission assignments and deletes old ones.""" """Adds new permission assignments and deletes old ones."""
initial_permission_assignments = PermissionAssignmentModel.query.all() initial_permission_assignments = PermissionAssignmentModel.query.all()
result = cls.import_permissions_from_yaml_file() result = cls.import_permissions_from_yaml_file()
desired_permission_assignments = result['permission_assignments'] desired_permission_assignments = result["permission_assignments"]
desired_group_identifiers = result['group_identifiers'] desired_group_identifiers = result["group_identifiers"]
for group in group_info: for group in group_info:
for username in group['users']: for username in group["users"]:
GroupService.add_user_to_group_or_add_to_waiting(username, group['name']) GroupService.add_user_to_group_or_add_to_waiting(
for permission in group['permissions']: username, group["name"]
for crud_op in permission['actions']: )
desired_permission_assignments.extend(cls.add_permission_from_uri_or_macro( for permission in group["permissions"]:
group_identifier=group['name'], target=permission['uri'], permission=crud_op for crud_op in permission["actions"]:
)) desired_permission_assignments.extend(
desired_group_identifiers.add(group['name']) cls.add_permission_from_uri_or_macro(
group_identifier=group["name"],
target=permission["uri"],
permission=crud_op,
)
)
desired_group_identifiers.add(group["name"])
for ipa in initial_permission_assignments: for ipa in initial_permission_assignments:
if ipa not in desired_permission_assignments: if ipa not in desired_permission_assignments:
db.session.delete(ipa) db.session.delete(ipa)
groups_to_delete = GroupModel.query.filter(GroupModel.identifier.not_in(desired_group_identifiers)).all() groups_to_delete = GroupModel.query.filter(
GroupModel.identifier.not_in(desired_group_identifiers)
).all()
for gtd in groups_to_delete: for gtd in groups_to_delete:
db.session.delete(gtd) db.session.delete(gtd)
db.session.commit() db.session.commit()

View File

@ -1,10 +1,10 @@
"""Group_service.""" """Group_service."""
from typing import Optional from typing import Optional
from spiffworkflow_backend.models.user import UserModel
from flask_bpmn.models.db import db from flask_bpmn.models.db import db
from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.user_service import UserService from spiffworkflow_backend.services.user_service import UserService
@ -25,7 +25,10 @@ class GroupService:
return group return group
@classmethod @classmethod
def add_user_to_group_or_add_to_waiting(cls, username: str, group_identifier: str) -> None: def add_user_to_group_or_add_to_waiting(
cls, username: str, group_identifier: str
) -> None:
"""Add_user_to_group_or_add_to_waiting."""
group = cls.find_or_create_group(group_identifier) group = cls.find_or_create_group(group_identifier)
user = UserModel.query.filter_by(username=username).first() user = UserModel.query.filter_by(username=username).first()
if user: if user:

View File

@ -1,39 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:process id="Process_02u675m" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_01cweoc</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_01cweoc" sourceRef="StartEvent_1" targetRef="add_permission_script" />
<bpmn:endEvent id="Event_11584qn">
<bpmn:incoming>Flow_1xle2yo</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_1xle2yo" sourceRef="add_permission_script" targetRef="Event_11584qn" />
<bpmn:scriptTask id="add_permission_script" name="Add Permission">
<bpmn:incoming>Flow_01cweoc</bpmn:incoming>
<bpmn:outgoing>Flow_1xle2yo</bpmn:outgoing>
<bpmn:script>add_permission('read', '/test_permission_uri', "test_group")</bpmn:script>
</bpmn:scriptTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_02u675m">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_11584qn_di" bpmnElement="Event_11584qn">
<dc:Bounds x="432" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1ymj79t_di" bpmnElement="add_permission_script">
<dc:Bounds x="270" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_01cweoc_di" bpmnElement="Flow_01cweoc">
<di:waypoint x="215" y="177" />
<di:waypoint x="270" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1xle2yo_di" bpmnElement="Flow_1xle2yo">
<di:waypoint x="370" y="177" />
<di:waypoint x="432" y="177" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>

View File

@ -1,100 +0,0 @@
"""Test_get_localtime."""
import pytest
from flask.app import Flask
from flask.testing import FlaskClient
from flask_bpmn.api.api_error import ApiError
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
from spiffworkflow_backend.models.permission_target import PermissionTargetModel
from spiffworkflow_backend.models.script_attributes_context import (
ScriptAttributesContext,
)
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.scripts.add_permission import AddPermission
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
class TestAddPermission(BaseTest):
"""TestAddPermission."""
def test_can_add_permission(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_can_get_members_of_a_group."""
self.find_or_create_user("test_user")
# now that we have everything, try to clear it out...
script_attributes_context = ScriptAttributesContext(
task=None,
environment_identifier="testing",
process_instance_id=1,
process_model_identifier="my_test_user",
)
group = GroupModel.query.filter(
GroupModel.identifier == "my_test_group"
).first()
permission_target = PermissionTargetModel.query.filter(
PermissionTargetModel.uri == "/test_add_permission/%"
).first()
assert group is None
assert permission_target is None
AddPermission().run(
script_attributes_context, "read", "/test_add_permission/*", "my_test_group"
)
group = GroupModel.query.filter(
GroupModel.identifier == "my_test_group"
).first()
permission_target = PermissionTargetModel.query.filter(
PermissionTargetModel.uri == "/test_add_permission/%"
).first()
permission_assignments = PermissionAssignmentModel.query.filter(
PermissionAssignmentModel.principal_id == group.principal.id
).all()
assert group is not None
assert permission_target is not None
assert len(permission_assignments) == 1
def test_add_permission_script_through_bpmn(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_add_permission_script_through_bpmn."""
basic_user = self.find_or_create_user("basic_user")
privileged_user = self.find_or_create_user("privileged_user")
self.add_permissions_to_user(
privileged_user,
target_uri="/can-run-privileged-script/add_permission",
permission_names=["create"],
)
process_model = load_test_spec(
process_model_id="add_permission",
process_model_source_directory="script_add_permission",
)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=basic_user
)
processor = ProcessInstanceProcessor(process_instance)
with pytest.raises(ApiError) as exception:
processor.do_engine_steps(save=True)
assert "ScriptUnauthorizedForUserError" in str(exception)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=privileged_user
)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
assert process_instance.status == "complete"

View File

@ -1,69 +0,0 @@
"""Test_get_localtime."""
from flask.app import Flask
from flask.testing import FlaskClient
from flask_bpmn.models.db import db
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.script_attributes_context import (
ScriptAttributesContext,
)
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.models.user_group_assignment_waiting import (
UserGroupAssignmentWaitingModel,
)
from spiffworkflow_backend.scripts.add_user_to_group import AddUserToGroup
class TestAddUserToGroup(BaseTest):
"""TestGetGroupMembers."""
def test_can_add_existing_user_to_existing_group(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_can_get_members_of_a_group."""
my_user = self.find_or_create_user("my_user")
my_group = GroupModel(identifier="my_group")
db.session.add(my_group)
script_attributes_context = ScriptAttributesContext(
task=None,
environment_identifier="testing",
process_instance_id=1,
process_model_identifier="my_test_user",
)
AddUserToGroup().run(
script_attributes_context, my_user.username, my_group.identifier
)
assert my_user in my_group.users
def test_can_add_non_existent_user_to_non_existent_group(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_can_add_non_existent_user_to_non_existent_group."""
script_attributes_context = ScriptAttributesContext(
task=None,
environment_identifier="testing",
process_instance_id=1,
process_model_identifier="my_test_user",
)
AddUserToGroup().run(
script_attributes_context, "dan@sartography.com", "competent-joes"
)
my_group = GroupModel.query.filter(
GroupModel.identifier == "competent-joes"
).first()
assert my_group is not None
waiting_assignments = (
UserGroupAssignmentWaitingModel()
.query.filter_by(username="dan@sartography.com")
.first()
)
assert waiting_assignments is not None

View File

@ -1,59 +0,0 @@
"""Test_get_localtime."""
from flask.app import Flask
from flask.testing import FlaskClient
from flask_bpmn.models.db import db
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.permission_target import PermissionTargetModel
from spiffworkflow_backend.models.script_attributes_context import (
ScriptAttributesContext,
)
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.scripts.clear_permissions import ClearPermissions
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.group_service import GroupService
from spiffworkflow_backend.services.user_service import UserService
class TestDeletePermissions(BaseTest):
"""TestGetGroupMembers."""
def test_can_delete_members(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_can_get_members_of_a_group."""
self.find_or_create_user("initiator_user")
testuser1 = self.find_or_create_user("testuser1")
testuser2 = self.find_or_create_user("testuser2")
testuser3 = self.find_or_create_user("testuser3")
group_a = GroupService.find_or_create_group("groupA")
group_b = GroupService.find_or_create_group("groupB")
db.session.add(group_a)
db.session.add(group_b)
db.session.commit()
UserService.add_user_to_group(testuser1, group_a)
UserService.add_user_to_group(testuser2, group_a)
UserService.add_user_to_group(testuser3, group_b)
target = PermissionTargetModel("test/*")
db.session.add(target)
db.session.commit()
AuthorizationService.create_permission_for_principal(
group_a.principal, target, "read"
)
# now that we have everything, try to clear it out...
script_attributes_context = ScriptAttributesContext(
task=None,
environment_identifier="testing",
process_instance_id=1,
process_model_identifier="my_test_user",
)
ClearPermissions().run(script_attributes_context)
groups = GroupModel.query.all()
assert 0 == len(groups)

View File

@ -7,8 +7,8 @@ from spiffworkflow_backend.models.script_attributes_context import (
ScriptAttributesContext, ScriptAttributesContext,
) )
from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.scripts.add_permission import AddPermission
from spiffworkflow_backend.scripts.get_all_permissions import GetAllPermissions from spiffworkflow_backend.scripts.get_all_permissions import GetAllPermissions
from spiffworkflow_backend.services.authorization_service import AuthorizationService
class TestGetAllPermissions(BaseTest): class TestGetAllPermissions(BaseTest):
@ -31,10 +31,12 @@ class TestGetAllPermissions(BaseTest):
process_instance_id=1, process_instance_id=1,
process_model_identifier="my_test_user", process_model_identifier="my_test_user",
) )
AddPermission().run( AuthorizationService.add_permission_from_uri_or_macro(
script_attributes_context, "start", "PG:hey:group", "my_test_group" permission="start", target="PG:hey:group", group_identifier="my_test_group"
)
AuthorizationService.add_permission_from_uri_or_macro(
permission="all", target="/tasks", group_identifier="my_test_group"
) )
AddPermission().run(script_attributes_context, "all", "/tasks", "my_test_group")
expected_permissions = [ expected_permissions = [
{ {

View File

@ -1,10 +1,10 @@
"""Test_message_service.""" """Test_message_service."""
import pytest import pytest
from spiffworkflow_backend.models.group import GroupModel
from flask import Flask from flask import Flask
from flask.testing import FlaskClient from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.models.user import UserNotFoundError from spiffworkflow_backend.models.user import UserNotFoundError
from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.authorization_service import AuthorizationService
@ -436,6 +436,7 @@ class TestAuthorizationService(BaseTest):
client: FlaskClient, client: FlaskClient,
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
) -> None: ) -> None:
"""Test_can_refresh_permissions."""
user = self.find_or_create_user(username="user_one") user = self.find_or_create_user(username="user_one")
admin_user = self.find_or_create_user(username="testadmin1") admin_user = self.find_or_create_user(username="testadmin1")
@ -443,14 +444,13 @@ class TestAuthorizationService(BaseTest):
GroupService.find_or_create_group("group_two") GroupService.find_or_create_group("group_two")
assert GroupModel.query.filter_by(identifier="group_two").first() is not None assert GroupModel.query.filter_by(identifier="group_two").first() is not None
group_info = [{ group_info = [
'users': ['user_one'], {
'name': 'group_one', "users": ["user_one"],
'permissions': [{ "name": "group_one",
'actions': ['create', 'read'], "permissions": [{"actions": ["create", "read"], "uri": "PG:hey"}],
'uri': 'PG:hey' }
}] ]
}]
AuthorizationService.refresh_permissions(group_info) AuthorizationService.refresh_permissions(group_info)
assert GroupModel.query.filter_by(identifier="group_two").first() is None assert GroupModel.query.filter_by(identifier="group_two").first() is None
assert GroupModel.query.filter_by(identifier="group_one").first() is not None assert GroupModel.query.filter_by(identifier="group_one").first() is not None
@ -459,17 +459,18 @@ class TestAuthorizationService(BaseTest):
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo") self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo")
self.assert_user_has_permission(user, "create", "/v1.0/process-groups/hey:yo") self.assert_user_has_permission(user, "create", "/v1.0/process-groups/hey:yo")
group_info = [{ group_info = [
'users': ['user_one'], {
'name': 'group_one', "users": ["user_one"],
'permissions': [{ "name": "group_one",
'actions': ['read'], "permissions": [{"actions": ["read"], "uri": "PG:hey"}],
'uri': 'PG:hey' }
}] ]
}]
AuthorizationService.refresh_permissions(group_info) AuthorizationService.refresh_permissions(group_info)
assert GroupModel.query.filter_by(identifier="group_one").first() is not None assert GroupModel.query.filter_by(identifier="group_one").first() is not None
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey") self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey")
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo") self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo")
self.assert_user_has_permission(user, "create", "/v1.0/process-groups/hey:yo", expected_result=False) self.assert_user_has_permission(
user, "create", "/v1.0/process-groups/hey:yo", expected_result=False
)
self.assert_user_has_permission(admin_user, "create", "/anything-they-want") self.assert_user_has_permission(admin_user, "create", "/anything-they-want")