pyl passes w/ burnettk

This commit is contained in:
jasquat 2022-12-22 16:42:52 -05:00
parent c7590fa440
commit 4f1054c307
10 changed files with 76 additions and 320 deletions

View File

@ -12,9 +12,6 @@ from spiffworkflow_backend.models.script_attributes_context import (
from spiffworkflow_backend.scripts.script import Script
# add_permission("read", "test/*", "Editors")
class GetAllPermissions(Script):
"""GetAllPermissions."""

View File

@ -7,10 +7,9 @@ from spiffworkflow_backend.models.script_attributes_context import (
from spiffworkflow_backend.scripts.script import Script
from spiffworkflow_backend.services.authorization_service import AuthorizationService
# add_permission("read", "test/*", "Editors")
class RecreatePermissions(Script):
"""RecreatePermissions."""
def get_description(self) -> str:
"""Get_description."""

View File

@ -1,13 +1,14 @@
"""Authorization_service."""
import inspect
from typing import TypedDict
from typing import Any, Set
import re
from dataclasses import dataclass
from hashlib import sha256
from hmac import compare_digest
from hmac import HMAC
from typing import Any
from typing import Optional
from typing import Set
from typing import TypedDict
from typing import Union
import jwt
@ -23,7 +24,6 @@ from sqlalchemy import or_
from sqlalchemy import text
from spiffworkflow_backend.helpers.api_version import V1_API_PATH_PREFIX
from spiffworkflow_backend.models import permission_assignment
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.human_task import HumanTaskModel
from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
@ -72,6 +72,8 @@ PATH_SEGMENTS_FOR_PERMISSION_ALL = [
class DesiredPermissionDict(TypedDict):
"""DesiredPermissionDict."""
group_identifiers: Set[str]
permission_assignments: list[PermissionAssignmentModel]
@ -236,9 +238,13 @@ class AuthorizationService:
for group_identifier in permission_config["groups"]:
group = GroupService.find_or_create_group(group_identifier)
unique_user_group_identifiers.add(group_identifier)
permission_assignments.append(cls.create_permission_for_principal(
group.principal, permission_target, allowed_permission
))
permission_assignments.append(
cls.create_permission_for_principal(
group.principal,
permission_target,
allowed_permission,
)
)
if "users" in permission_config:
for username in permission_config["users"]:
user = UserModel.query.filter_by(username=username).first()
@ -248,15 +254,20 @@ class AuthorizationService:
.filter(UserModel.username == username)
.first()
)
permission_assignments.append(cls.create_permission_for_principal(
permission_assignments.append(
cls.create_permission_for_principal(
principal, permission_target, allowed_permission
))
)
)
if default_group is not None:
for user in UserModel.query.all():
cls.associate_user_with_group(user, default_group)
return { 'group_identifiers': unique_user_group_identifiers, 'permission_assignments': permission_assignments }
return {
"group_identifiers": unique_user_group_identifiers,
"permission_assignments": permission_assignments,
}
@classmethod
def find_or_create_permission_target(cls, uri: str) -> PermissionTargetModel:
@ -715,9 +726,11 @@ class AuthorizationService:
permission_target = cls.find_or_create_permission_target(
permission_to_assign.target_uri
)
permission_assignments.append(cls.create_permission_for_principal(
permission_assignments.append(
cls.create_permission_for_principal(
group.principal, permission_target, permission_to_assign.permission
))
)
)
return permission_assignments
@classmethod
@ -725,24 +738,32 @@ class AuthorizationService:
"""Adds new permission assignments and deletes old ones."""
initial_permission_assignments = PermissionAssignmentModel.query.all()
result = cls.import_permissions_from_yaml_file()
desired_permission_assignments = result['permission_assignments']
desired_group_identifiers = result['group_identifiers']
desired_permission_assignments = result["permission_assignments"]
desired_group_identifiers = result["group_identifiers"]
for group in group_info:
for username in group['users']:
GroupService.add_user_to_group_or_add_to_waiting(username, group['name'])
for permission in group['permissions']:
for crud_op in permission['actions']:
desired_permission_assignments.extend(cls.add_permission_from_uri_or_macro(
group_identifier=group['name'], target=permission['uri'], permission=crud_op
))
desired_group_identifiers.add(group['name'])
for username in group["users"]:
GroupService.add_user_to_group_or_add_to_waiting(
username, group["name"]
)
for permission in group["permissions"]:
for crud_op in permission["actions"]:
desired_permission_assignments.extend(
cls.add_permission_from_uri_or_macro(
group_identifier=group["name"],
target=permission["uri"],
permission=crud_op,
)
)
desired_group_identifiers.add(group["name"])
for ipa in initial_permission_assignments:
if ipa not in desired_permission_assignments:
db.session.delete(ipa)
groups_to_delete = GroupModel.query.filter(GroupModel.identifier.not_in(desired_group_identifiers)).all()
groups_to_delete = GroupModel.query.filter(
GroupModel.identifier.not_in(desired_group_identifiers)
).all()
for gtd in groups_to_delete:
db.session.delete(gtd)
db.session.commit()

View File

@ -1,10 +1,10 @@
"""Group_service."""
from typing import Optional
from spiffworkflow_backend.models.user import UserModel
from flask_bpmn.models.db import db
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.user_service import UserService
@ -25,7 +25,10 @@ class GroupService:
return group
@classmethod
def add_user_to_group_or_add_to_waiting(cls, username: str, group_identifier: str) -> None:
def add_user_to_group_or_add_to_waiting(
cls, username: str, group_identifier: str
) -> None:
"""Add_user_to_group_or_add_to_waiting."""
group = cls.find_or_create_group(group_identifier)
user = UserModel.query.filter_by(username=username).first()
if user:

View File

@ -1,39 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:process id="Process_02u675m" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_01cweoc</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_01cweoc" sourceRef="StartEvent_1" targetRef="add_permission_script" />
<bpmn:endEvent id="Event_11584qn">
<bpmn:incoming>Flow_1xle2yo</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_1xle2yo" sourceRef="add_permission_script" targetRef="Event_11584qn" />
<bpmn:scriptTask id="add_permission_script" name="Add Permission">
<bpmn:incoming>Flow_01cweoc</bpmn:incoming>
<bpmn:outgoing>Flow_1xle2yo</bpmn:outgoing>
<bpmn:script>add_permission('read', '/test_permission_uri', "test_group")</bpmn:script>
</bpmn:scriptTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_02u675m">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_11584qn_di" bpmnElement="Event_11584qn">
<dc:Bounds x="432" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1ymj79t_di" bpmnElement="add_permission_script">
<dc:Bounds x="270" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_01cweoc_di" bpmnElement="Flow_01cweoc">
<di:waypoint x="215" y="177" />
<di:waypoint x="270" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1xle2yo_di" bpmnElement="Flow_1xle2yo">
<di:waypoint x="370" y="177" />
<di:waypoint x="432" y="177" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>

View File

@ -1,100 +0,0 @@
"""Test_get_localtime."""
import pytest
from flask.app import Flask
from flask.testing import FlaskClient
from flask_bpmn.api.api_error import ApiError
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
from spiffworkflow_backend.models.permission_target import PermissionTargetModel
from spiffworkflow_backend.models.script_attributes_context import (
ScriptAttributesContext,
)
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.scripts.add_permission import AddPermission
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
class TestAddPermission(BaseTest):
"""TestAddPermission."""
def test_can_add_permission(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_can_get_members_of_a_group."""
self.find_or_create_user("test_user")
# now that we have everything, try to clear it out...
script_attributes_context = ScriptAttributesContext(
task=None,
environment_identifier="testing",
process_instance_id=1,
process_model_identifier="my_test_user",
)
group = GroupModel.query.filter(
GroupModel.identifier == "my_test_group"
).first()
permission_target = PermissionTargetModel.query.filter(
PermissionTargetModel.uri == "/test_add_permission/%"
).first()
assert group is None
assert permission_target is None
AddPermission().run(
script_attributes_context, "read", "/test_add_permission/*", "my_test_group"
)
group = GroupModel.query.filter(
GroupModel.identifier == "my_test_group"
).first()
permission_target = PermissionTargetModel.query.filter(
PermissionTargetModel.uri == "/test_add_permission/%"
).first()
permission_assignments = PermissionAssignmentModel.query.filter(
PermissionAssignmentModel.principal_id == group.principal.id
).all()
assert group is not None
assert permission_target is not None
assert len(permission_assignments) == 1
def test_add_permission_script_through_bpmn(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_add_permission_script_through_bpmn."""
basic_user = self.find_or_create_user("basic_user")
privileged_user = self.find_or_create_user("privileged_user")
self.add_permissions_to_user(
privileged_user,
target_uri="/can-run-privileged-script/add_permission",
permission_names=["create"],
)
process_model = load_test_spec(
process_model_id="add_permission",
process_model_source_directory="script_add_permission",
)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=basic_user
)
processor = ProcessInstanceProcessor(process_instance)
with pytest.raises(ApiError) as exception:
processor.do_engine_steps(save=True)
assert "ScriptUnauthorizedForUserError" in str(exception)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=privileged_user
)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
assert process_instance.status == "complete"

View File

@ -1,69 +0,0 @@
"""Test_get_localtime."""
from flask.app import Flask
from flask.testing import FlaskClient
from flask_bpmn.models.db import db
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.script_attributes_context import (
ScriptAttributesContext,
)
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.models.user_group_assignment_waiting import (
UserGroupAssignmentWaitingModel,
)
from spiffworkflow_backend.scripts.add_user_to_group import AddUserToGroup
class TestAddUserToGroup(BaseTest):
"""TestGetGroupMembers."""
def test_can_add_existing_user_to_existing_group(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_can_get_members_of_a_group."""
my_user = self.find_or_create_user("my_user")
my_group = GroupModel(identifier="my_group")
db.session.add(my_group)
script_attributes_context = ScriptAttributesContext(
task=None,
environment_identifier="testing",
process_instance_id=1,
process_model_identifier="my_test_user",
)
AddUserToGroup().run(
script_attributes_context, my_user.username, my_group.identifier
)
assert my_user in my_group.users
def test_can_add_non_existent_user_to_non_existent_group(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_can_add_non_existent_user_to_non_existent_group."""
script_attributes_context = ScriptAttributesContext(
task=None,
environment_identifier="testing",
process_instance_id=1,
process_model_identifier="my_test_user",
)
AddUserToGroup().run(
script_attributes_context, "dan@sartography.com", "competent-joes"
)
my_group = GroupModel.query.filter(
GroupModel.identifier == "competent-joes"
).first()
assert my_group is not None
waiting_assignments = (
UserGroupAssignmentWaitingModel()
.query.filter_by(username="dan@sartography.com")
.first()
)
assert waiting_assignments is not None

View File

@ -1,59 +0,0 @@
"""Test_get_localtime."""
from flask.app import Flask
from flask.testing import FlaskClient
from flask_bpmn.models.db import db
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.permission_target import PermissionTargetModel
from spiffworkflow_backend.models.script_attributes_context import (
ScriptAttributesContext,
)
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.scripts.clear_permissions import ClearPermissions
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.group_service import GroupService
from spiffworkflow_backend.services.user_service import UserService
class TestDeletePermissions(BaseTest):
"""TestGetGroupMembers."""
def test_can_delete_members(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_can_get_members_of_a_group."""
self.find_or_create_user("initiator_user")
testuser1 = self.find_or_create_user("testuser1")
testuser2 = self.find_or_create_user("testuser2")
testuser3 = self.find_or_create_user("testuser3")
group_a = GroupService.find_or_create_group("groupA")
group_b = GroupService.find_or_create_group("groupB")
db.session.add(group_a)
db.session.add(group_b)
db.session.commit()
UserService.add_user_to_group(testuser1, group_a)
UserService.add_user_to_group(testuser2, group_a)
UserService.add_user_to_group(testuser3, group_b)
target = PermissionTargetModel("test/*")
db.session.add(target)
db.session.commit()
AuthorizationService.create_permission_for_principal(
group_a.principal, target, "read"
)
# now that we have everything, try to clear it out...
script_attributes_context = ScriptAttributesContext(
task=None,
environment_identifier="testing",
process_instance_id=1,
process_model_identifier="my_test_user",
)
ClearPermissions().run(script_attributes_context)
groups = GroupModel.query.all()
assert 0 == len(groups)

View File

@ -7,8 +7,8 @@ from spiffworkflow_backend.models.script_attributes_context import (
ScriptAttributesContext,
)
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.scripts.add_permission import AddPermission
from spiffworkflow_backend.scripts.get_all_permissions import GetAllPermissions
from spiffworkflow_backend.services.authorization_service import AuthorizationService
class TestGetAllPermissions(BaseTest):
@ -31,10 +31,12 @@ class TestGetAllPermissions(BaseTest):
process_instance_id=1,
process_model_identifier="my_test_user",
)
AddPermission().run(
script_attributes_context, "start", "PG:hey:group", "my_test_group"
AuthorizationService.add_permission_from_uri_or_macro(
permission="start", target="PG:hey:group", group_identifier="my_test_group"
)
AuthorizationService.add_permission_from_uri_or_macro(
permission="all", target="/tasks", group_identifier="my_test_group"
)
AddPermission().run(script_attributes_context, "all", "/tasks", "my_test_group")
expected_permissions = [
{

View File

@ -1,10 +1,10 @@
"""Test_message_service."""
import pytest
from spiffworkflow_backend.models.group import GroupModel
from flask import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.models.user import UserNotFoundError
from spiffworkflow_backend.services.authorization_service import AuthorizationService
@ -436,6 +436,7 @@ class TestAuthorizationService(BaseTest):
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_can_refresh_permissions."""
user = self.find_or_create_user(username="user_one")
admin_user = self.find_or_create_user(username="testadmin1")
@ -443,14 +444,13 @@ class TestAuthorizationService(BaseTest):
GroupService.find_or_create_group("group_two")
assert GroupModel.query.filter_by(identifier="group_two").first() is not None
group_info = [{
'users': ['user_one'],
'name': 'group_one',
'permissions': [{
'actions': ['create', 'read'],
'uri': 'PG:hey'
}]
}]
group_info = [
{
"users": ["user_one"],
"name": "group_one",
"permissions": [{"actions": ["create", "read"], "uri": "PG:hey"}],
}
]
AuthorizationService.refresh_permissions(group_info)
assert GroupModel.query.filter_by(identifier="group_two").first() is None
assert GroupModel.query.filter_by(identifier="group_one").first() is not None
@ -459,17 +459,18 @@ class TestAuthorizationService(BaseTest):
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo")
self.assert_user_has_permission(user, "create", "/v1.0/process-groups/hey:yo")
group_info = [{
'users': ['user_one'],
'name': 'group_one',
'permissions': [{
'actions': ['read'],
'uri': 'PG:hey'
}]
}]
group_info = [
{
"users": ["user_one"],
"name": "group_one",
"permissions": [{"actions": ["read"], "uri": "PG:hey"}],
}
]
AuthorizationService.refresh_permissions(group_info)
assert GroupModel.query.filter_by(identifier="group_one").first() is not None
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey")
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo")
self.assert_user_has_permission(user, "create", "/v1.0/process-groups/hey:yo", expected_result=False)
self.assert_user_has_permission(
user, "create", "/v1.0/process-groups/hey:yo", expected_result=False
)
self.assert_user_has_permission(admin_user, "create", "/anything-they-want")