Merge pull request #271 from sartography/feature/add_missing_perms_to_elevated

Feature/add missing perms to elevated
This commit is contained in:
Kevin Burnett 2023-05-22 19:03:12 +00:00 committed by GitHub
commit 934d2a1c46
37 changed files with 390 additions and 821 deletions

View File

@ -7,5 +7,20 @@ function error_handler() {
trap 'error_handler ${LINENO} $?' ERR
set -o errtrace -o errexit -o nounset -o pipefail
set -x
mysql -uroot spiffworkflow_backend_development -e 'select pa.id, g.identifier group_identifier, pt.uri, permission from permission_assignment pa join principal p on p.id = pa.principal_id join `group` g on g.id = p.group_id join permission_target pt on pt.id = pa.permission_target_id;'
database=spiffworkflow_backend_local_development
if [[ "${1:-}" == "test" ]]; then
database=spiffworkflow_backend_unit_testing
fi
# shellcheck disable=2016
mysql -uroot "$database" -e '
select u.username user, g.identifier group
FROM `user` u
JOIN `user_group_assignment` uga on uga.user_id = u.id
JOIN `group` g on g.id = uga.group_id;
select pa.id, g.identifier group_identifier, pt.uri, permission from permission_assignment pa
join principal p on p.id = pa.principal_id
join `group` g on g.id = p.group_id
join permission_target pt on pt.id = pa.permission_target_id;
'

View File

@ -4,19 +4,12 @@ import shutil
import pytest
from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
from spiffworkflow_backend.services.process_instance_service import (
ProcessInstanceService,
)
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.process_model_service import ProcessModelService
@ -63,64 +56,6 @@ def with_db_and_bpmn_file_cleanup() -> None:
@pytest.fixture()
def with_super_admin_user() -> UserModel:
"""With_super_admin_user."""
return BaseTest.create_user_with_permission("super_admin")
@pytest.fixture()
def setup_process_instances_for_reports(
client: FlaskClient, with_super_admin_user: UserModel
) -> list[ProcessInstanceModel]:
"""Setup_process_instances_for_reports."""
user = with_super_admin_user
process_group_id = "runs_without_input"
process_model_id = "sample"
# bpmn_file_name = "sample.bpmn"
bpmn_file_location = "sample"
process_model_identifier = BaseTest().create_group_and_model_with_bpmn(
client,
with_super_admin_user,
process_group_id=process_group_id,
process_model_id=process_model_id,
# bpmn_file_name=bpmn_file_name,
bpmn_file_location=bpmn_file_location,
)
# BaseTest().create_process_group(
# client=client, user=user, process_group_id=process_group_id, display_name=process_group_id
# )
# process_model_id = "runs_without_input/sample"
# load_test_spec(
# process_model_id=f"{process_group_id}/{process_model_id}",
# process_model_source_directory="sample"
# )
process_instances = []
for data in [kay(), ray(), jay()]:
process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
# process_group_identifier=process_group_id,
process_model_identifier=process_model_identifier,
user=user,
)
processor = ProcessInstanceProcessor(process_instance)
processor.slam_in_data(data)
process_instance.status = "complete"
db.session.add(process_instance)
db.session.commit()
process_instances.append(process_instance)
return process_instances
def kay() -> dict:
"""Kay."""
return {"name": "kay", "grade_level": 2, "test_score": 10}
def ray() -> dict:
"""Ray."""
return {"name": "ray", "grade_level": 1, "test_score": 9}
def jay() -> dict:
"""Jay."""
return {"name": "jay", "grade_level": 2, "test_score": 8}
user = BaseTest.find_or_create_user(username="testadmin1")
AuthorizationService.import_permissions_from_yaml_file(user)
return user

View File

@ -149,7 +149,7 @@ paths:
$ref: "#/components/schemas/OkTrue"
/debug/test-raise-error:
get:
post:
operationId: spiffworkflow_backend.routes.debug_controller.test_raise_error
summary: Returns an unhandled exception that should notify sentry, if sentry is configured
tags:
@ -184,7 +184,7 @@ paths:
description: The identifier for the last visited page for the user.
schema:
type: string
get:
post:
tags:
- Active User
operationId: spiffworkflow_backend.routes.active_users_controller.active_user_updates
@ -207,7 +207,7 @@ paths:
description: The identifier for the last visited page for the user.
schema:
type: string
get:
post:
tags:
- Active User
operationId: spiffworkflow_backend.routes.active_users_controller.active_user_unregister
@ -425,7 +425,7 @@ paths:
schema:
$ref: "#/components/schemas/ProcessModel"
/process-models-natural-language/{modified_process_group_id}:
/process-model-natural-language/{modified_process_group_id}:
parameters:
- name: modified_process_group_id
in: path

View File

@ -16,10 +16,18 @@ groups:
users: [testuser2, testuser3, testuser4]
permissions:
admin:
process-groups-all:
groups: [admin]
allowed_permissions: [create, read, update, delete]
uri: /*
allowed_permissions: [all]
uri: PG:ALL
basic:
groups: [admin]
allowed_permissions: [all]
uri: BASIC
elevated-operations:
groups: [admin]
allowed_permissions: [all]
uri: ELEVATED
read-all:
groups: ["Finance Team", hr, admin]

View File

@ -257,7 +257,7 @@ def manual_complete_task(
process_instance = ProcessInstanceModel.query.filter(ProcessInstanceModel.id == int(process_instance_id)).first()
if process_instance:
processor = ProcessInstanceProcessor(process_instance)
processor.manual_complete_task(task_guid, execute)
processor.manual_complete_task(task_guid, execute, g.user)
else:
raise ApiError(
error_code="complete_task",

View File

@ -1,43 +0,0 @@
"""Save process instance metadata."""
from typing import Any
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.process_instance_metadata import (
ProcessInstanceMetadataModel,
)
from spiffworkflow_backend.models.script_attributes_context import (
ScriptAttributesContext,
)
from spiffworkflow_backend.scripts.script import Script
class SaveProcessInstanceMetadata(Script):
"""SaveProcessInstanceMetadata."""
def get_description(self) -> str:
"""Get_description."""
return """Save a given dict as process instance metadata (useful for creating reports)."""
def run(
self,
script_attributes_context: ScriptAttributesContext,
*args: Any,
**kwargs: Any,
) -> Any:
"""Run."""
metadata_dict = args[0]
if script_attributes_context.process_instance_id is None:
raise self.get_proces_instance_id_is_missing_error("save_process_instance_metadata")
for key, value in metadata_dict.items():
pim = ProcessInstanceMetadataModel.query.filter_by(
process_instance_id=script_attributes_context.process_instance_id,
key=key,
).first()
if pim is None:
pim = ProcessInstanceMetadataModel(
process_instance_id=script_attributes_context.process_instance_id,
key=key,
)
pim.value = value
db.session.add(pim)
db.session.commit()

View File

@ -79,6 +79,7 @@ PATH_SEGMENTS_FOR_PERMISSION_ALL = [
{"path": "/process-data-file-download", "relevant_permissions": ["read"]},
{"path": "/process-instance-suspend", "relevant_permissions": ["create"]},
{"path": "/process-instance-terminate", "relevant_permissions": ["create"]},
{"path": "/process-model-natural-language", "relevant_permissions": ["create"]},
{"path": "/process-model-publish", "relevant_permissions": ["create"]},
{"path": "/task-data", "relevant_permissions": ["read", "update"]},
]
@ -487,11 +488,10 @@ class AuthorizationService:
@classmethod
def set_basic_permissions(cls) -> list[PermissionToAssign]:
permissions_to_assign: list[PermissionToAssign] = []
permissions_to_assign.append(PermissionToAssign(permission="create", target_uri="/active-users/*"))
permissions_to_assign.append(PermissionToAssign(permission="create", target_uri="/process-instances/for-me"))
permissions_to_assign.append(
PermissionToAssign(permission="read", target_uri="/process-instances/report-metadata")
)
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/active-users/*"))
permissions_to_assign.append(PermissionToAssign(permission="create", target_uri="/users/exists/by-username"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/connector-proxy/typeahead/*"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/debug/version-info"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/process-groups"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/process-models"))
@ -499,7 +499,11 @@ class AuthorizationService:
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/processes/callers"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/service-tasks"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/user-groups/for-current-user"))
permissions_to_assign.append(PermissionToAssign(permission="create", target_uri="/users/exists/by-username"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/users/search"))
permissions_to_assign.append(
PermissionToAssign(permission="read", target_uri="/process-instances/report-metadata")
)
permissions_to_assign.append(
PermissionToAssign(permission="read", target_uri="/process-instances/find-by-id/*")
)
@ -522,13 +526,15 @@ class AuthorizationService:
# FIXME: we need to fix so that user that can start a process-model
# can also start through messages as well
permissions_to_assign.append(PermissionToAssign(permission="create", target_uri="/messages/*"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/messages"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/authentications"))
permissions_to_assign.append(
PermissionToAssign(permission="create", target_uri="/can-run-privileged-script/*")
)
permissions_to_assign.append(PermissionToAssign(permission="create", target_uri="/debug/*"))
permissions_to_assign.append(PermissionToAssign(permission="create", target_uri="/send-event/*"))
permissions_to_assign.append(PermissionToAssign(permission="create", target_uri="/task-complete/*"))
permissions_to_assign.append(PermissionToAssign(permission="read", target_uri="/users/search"))
# read comes from PG and PM permissions
permissions_to_assign.append(PermissionToAssign(permission="update", target_uri="/task-data/*"))
@ -697,6 +703,8 @@ class AuthorizationService:
group_identifier = group["name"]
GroupService.find_or_create_group(group_identifier)
for username in group["users"]:
if user_model and username != user_model.username:
continue
user_to_group_dict: UserToGroupDict = {
"username": username,
"group_identifier": group_identifier,
@ -704,6 +712,10 @@ class AuthorizationService:
user_to_group_identifiers.append(user_to_group_dict)
GroupService.add_user_to_group_or_add_to_waiting(username, group_identifier)
unique_user_group_identifiers.add(group_identifier)
for group in group_permissions:
group_identifier = group["name"]
if user_model and group_identifier not in unique_user_group_identifiers:
continue
for permission in group["permissions"]:
for crud_op in permission["actions"]:
permission_assignments.extend(

View File

@ -25,7 +25,6 @@ from uuid import UUID
import dateparser
import pytz
from flask import current_app
from flask import g
from lxml import etree # type: ignore
from lxml.etree import XMLSyntaxError # type: ignore
from RestrictedPython import safe_globals # type: ignore
@ -1109,7 +1108,7 @@ class ProcessInstanceProcessor:
# TODO: do_engine_steps without a lock
self.do_engine_steps(save=True)
def manual_complete_task(self, task_id: str, execute: bool) -> None:
def manual_complete_task(self, task_id: str, execute: bool, user: UserModel) -> None:
"""Mark the task complete optionally executing it."""
spiff_task = self.bpmn_process_instance.get_task_from_id(UUID(task_id))
event_type = ProcessInstanceEventType.task_skipped.value
@ -1122,7 +1121,7 @@ class ProcessInstanceProcessor:
f" instance {self.process_instance_model.id}"
)
human_task = HumanTaskModel.query.filter_by(task_id=task_id).first()
self.complete_task(spiff_task, human_task=human_task, user=g.user)
self.complete_task(spiff_task, human_task=human_task, user=user)
elif execute:
current_app.logger.info(
f"Manually executing Task {spiff_task.task_spec.name} of process"

View File

@ -11,13 +11,13 @@
<bpmn:scriptTask id="save_key1">
<bpmn:incoming>Flow_1j4jzft</bpmn:incoming>
<bpmn:outgoing>Flow_10xyk22</bpmn:outgoing>
<bpmn:script>save_process_instance_metadata({"key1": "value1"})</bpmn:script>
<bpmn:script>key1 = "value1"</bpmn:script>
</bpmn:scriptTask>
<bpmn:sequenceFlow id="Flow_10xyk22" sourceRef="save_key1" targetRef="save_key2" />
<bpmn:scriptTask id="save_key2">
<bpmn:incoming>Flow_10xyk22</bpmn:incoming>
<bpmn:outgoing>Flow_01xr2ac</bpmn:outgoing>
<bpmn:script>save_process_instance_metadata({"key2": "value2", "key3": "value3"})</bpmn:script>
<bpmn:script>key2 = "value2"; key3 = "value3"</bpmn:script>
</bpmn:scriptTask>
<bpmn:sequenceFlow id="Flow_01xr2ac" sourceRef="save_key2" targetRef="Event_1s123jg" />
</bpmn:process>

View File

@ -14,6 +14,7 @@ from werkzeug.test import TestResponse # type: ignore
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.permission_assignment import Permission
from spiffworkflow_backend.models.permission_target import PermissionTargetModel
from spiffworkflow_backend.models.process_group import ProcessGroup
@ -26,9 +27,11 @@ from spiffworkflow_backend.models.process_model import ProcessModelInfoSchema
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.process_instance_queue_service import (
ProcessInstanceQueueService,
)
from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.user_service import UserService
@ -56,7 +59,6 @@ class BaseTest:
@staticmethod
def logged_in_headers(user: UserModel, _redirect_url: str = "http://some/frontend/url") -> Dict[str, str]:
"""Logged_in_headers."""
return dict(Authorization="Bearer " + user.encode_auth_token())
def create_group_and_model_with_bpmn(
@ -314,7 +316,6 @@ class BaseTest:
target_uri: str = PermissionTargetModel.URI_ALL,
permission_names: Optional[list[str]] = None,
) -> UserModel:
"""Create_user_with_permission."""
user = BaseTest.find_or_create_user(username=username)
return cls.add_permissions_to_user(user, target_uri=target_uri, permission_names=permission_names)
@ -325,7 +326,6 @@ class BaseTest:
target_uri: str = PermissionTargetModel.URI_ALL,
permission_names: Optional[list[str]] = None,
) -> UserModel:
"""Add_permissions_to_user."""
permission_target = AuthorizationService.find_or_create_permission_target(target_uri)
if permission_names is None:
@ -401,3 +401,65 @@ class BaseTest:
def empty_report_metadata_body(self) -> ReportMetadata:
return {"filter_by": [], "columns": [], "order_by": []}
def start_sender_process(
self,
client: FlaskClient,
payload: dict,
group_name: str = "test_group",
) -> ProcessInstanceModel:
process_model = load_test_spec(
"test_group/message",
process_model_source_directory="message_send_one_conversation",
bpmn_file_name="message_sender.bpmn", # Slightly misnamed, it sends and receives
)
process_instance = self.create_process_instance_from_process_model(process_model)
processor_send_receive = ProcessInstanceProcessor(process_instance)
processor_send_receive.do_engine_steps(save=True)
task = processor_send_receive.get_all_user_tasks()[0]
human_task = process_instance.active_human_tasks[0]
ProcessInstanceService.complete_form_task(
processor_send_receive,
task,
payload,
process_instance.process_initiator,
human_task,
)
processor_send_receive.save()
return process_instance
def assure_a_message_was_sent(self, process_instance: ProcessInstanceModel, payload: dict) -> None:
# There should be one new send message for the given process instance.
send_messages = (
MessageInstanceModel.query.filter_by(message_type="send")
.filter_by(process_instance_id=process_instance.id)
.order_by(MessageInstanceModel.id)
.all()
)
assert len(send_messages) == 1
send_message = send_messages[0]
assert send_message.payload == payload, "The send message should match up with the payload"
assert send_message.name == "Request Approval"
assert send_message.status == "ready"
def assure_there_is_a_process_waiting_on_a_message(self, process_instance: ProcessInstanceModel) -> None:
# There should be one new send message for the given process instance.
waiting_messages = (
MessageInstanceModel.query.filter_by(message_type="receive")
.filter_by(status="ready")
.filter_by(process_instance_id=process_instance.id)
.order_by(MessageInstanceModel.id)
.all()
)
assert len(waiting_messages) == 1
waiting_message = waiting_messages[0]
self.assure_correlation_properties_are_right(waiting_message)
def assure_correlation_properties_are_right(self, message: MessageInstanceModel) -> None:
# Correlation Properties should match up
po_curr = next(c for c in message.correlation_rules if c.name == "po_number")
customer_curr = next(c for c in message.correlation_rules if c.name == "customer_id")
assert po_curr is not None
assert customer_curr is not None

View File

@ -10,7 +10,7 @@ class TestDebugController(BaseTest):
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
response = client.get(
response = client.post(
"/v1.0/debug/test-raise-error",
)
assert response.status_code == 500

View File

@ -25,14 +25,12 @@ class TestLoggingService(BaseTest):
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
initiator_user = self.find_or_create_user("initiator_user")
assert initiator_user.principal is not None
AuthorizationService.import_permissions_from_yaml_file()
process_model = load_test_spec(
process_model_id="misc/category_number_one/simple_form",
# bpmn_file_name="simp.bpmn",
process_model_source_directory="simple_form",
)
process_instance = self.create_process_instance_from_process_model(
@ -85,14 +83,12 @@ class TestLoggingService(BaseTest):
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
initiator_user = self.find_or_create_user("initiator_user")
assert initiator_user.principal is not None
AuthorizationService.import_permissions_from_yaml_file()
process_model = load_test_spec(
process_model_id="misc/category_number_one/simple_form",
# bpmn_file_name="simp.bpmn",
process_model_source_directory="simple_form",
)
process_instance = self.create_process_instance_from_process_model(

View File

@ -0,0 +1,63 @@
import pytest
from flask import Flask
from flask import g
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.routes.messages_controller import message_send
class TestMessages(BaseTest):
def test_message_from_api_into_running_process(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test sending a message to a running process via the API.
This example workflow will send a message called 'request_approval' and then wait for a response message
of 'approval_result'. This test assures that it will fire the message with the correct correlation properties
and will respond only to a message called 'approval_result' that has the matching correlation properties,
as sent by an API Call.
"""
payload = {
"customer_id": "Sartography",
"po_number": 1001,
"description": "We built a new feature for messages!",
"amount": "100.00",
}
process_instance = self.start_sender_process(client, payload, "test_from_api")
self.assure_a_message_was_sent(process_instance, payload)
self.assure_there_is_a_process_waiting_on_a_message(process_instance)
g.user = process_instance.process_initiator
# Make an API call to the service endpoint, but use the wrong po number
with pytest.raises(ApiError):
message_send("Approval Result", {"payload": {"po_number": 5001}})
# Should return an error when making an API call for right po number, wrong client
with pytest.raises(ApiError):
message_send(
"Approval Result",
{"payload": {"po_number": 1001, "customer_id": "jon"}},
)
# No error when calling with the correct parameters
message_send(
"Approval Result",
{"payload": {"po_number": 1001, "customer_id": "Sartography"}},
)
# There is no longer a waiting message
waiting_messages = (
MessageInstanceModel.query.filter_by(message_type="receive")
.filter_by(status="ready")
.filter_by(process_instance_id=process_instance.id)
.all()
)
assert len(waiting_messages) == 0
# The process has completed
assert process_instance.status == "complete"

View File

@ -182,7 +182,7 @@ class TestProcessApi(BaseTest):
user=with_super_admin_user,
)
response = client.post(
f"/v1.0/process-models-natural-language/{process_group_id}",
f"/v1.0/process-model-natural-language/{process_group_id}",
content_type="application/json",
data=json.dumps(body),
headers=self.logged_in_headers(with_super_admin_user),
@ -238,9 +238,6 @@ class TestProcessApi(BaseTest):
process_model_identifier = f"{process_group_id}/{process_model_id}"
initial_primary_process_id = "sample"
terminal_primary_process_id = "new_process_id"
self.create_process_group_with_api(
client=client, user=with_super_admin_user, process_group_id=process_group_id
)
bpmn_file_name = f"{process_model_id}.bpmn"
bpmn_file_source_directory = process_model_id
@ -282,14 +279,11 @@ class TestProcessApi(BaseTest):
) -> None:
"""Test_process_model_delete."""
process_group_id = "test_process_group"
process_group_description = "Test Process Group"
process_model_id = "sample"
process_model_identifier = f"{process_group_id}/{process_model_id}"
self.create_process_group_with_api(client, with_super_admin_user, process_group_id, process_group_description)
self.create_process_model_with_api(
client,
process_model = load_test_spec(
process_model_id=process_model_identifier,
user=with_super_admin_user,
process_model_source_directory=process_model_id,
)
# assert we have a model
@ -3051,6 +3045,17 @@ class TestProcessApi(BaseTest):
bpmn_file_name="save_process_instance_metadata.bpmn",
process_model_source_directory="save_process_instance_metadata",
)
ProcessModelService.update_process_model(
process_model,
{
"metadata_extraction_paths": [
{"key": "key1", "path": "key1"},
{"key": "key2", "path": "key2"},
{"key": "key3", "path": "key3"},
]
},
)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=with_super_admin_user
)
@ -3188,6 +3193,16 @@ class TestProcessApi(BaseTest):
bpmn_file_name="save_process_instance_metadata.bpmn",
process_model_source_directory="save_process_instance_metadata",
)
ProcessModelService.update_process_model(
process_model,
{
"metadata_extraction_paths": [
{"key": "key1", "path": "key1"},
{"key": "key2", "path": "key2"},
{"key": "key3", "path": "key3"},
]
},
)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=with_super_admin_user
)
@ -3262,7 +3277,6 @@ class TestProcessApi(BaseTest):
with_super_admin_user: UserModel,
) -> None:
"""Test_process_instance_list_can_order_by_metadata."""
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
process_model = load_test_spec(
"test_group/hello_world",
process_model_source_directory="nested-task-data-structure",

View File

@ -1,31 +1,21 @@
"""Test_get_localtime."""
from operator import itemgetter
from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.script_attributes_context import (
ScriptAttributesContext,
)
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.scripts.get_all_permissions import GetAllPermissions
from spiffworkflow_backend.services.authorization_service import AuthorizationService
class TestGetAllPermissions(BaseTest):
"""TestGetAllPermissions."""
def test_can_get_all_permissions(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_can_get_all_permissions."""
self.find_or_create_user("test_user")
# now that we have everything, try to clear it out...
script_attributes_context = ScriptAttributesContext(
task=None,

View File

@ -3,14 +3,12 @@ import json
from flask import g
from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.script_attributes_context import (
ScriptAttributesContext,
)
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.scripts.get_current_user import GetCurrentUser
@ -18,11 +16,8 @@ class TestGetCurrentUser(BaseTest):
def test_get_current_user(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_can_get_members_of_a_group."""
testuser1 = self.find_or_create_user("testuser1")
testuser1.tenant_specific_field_1 = "456"
db.session.add(testuser1)

View File

@ -1,12 +1,9 @@
"""Test_get_localtime."""
from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
@ -14,16 +11,11 @@ from spiffworkflow_backend.services.user_service import UserService
class TestGetGroupMembers(BaseTest):
"""TestGetGroupMembers."""
def test_can_get_members_of_a_group(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_can_get_members_of_a_group."""
initiator_user = self.find_or_create_user("initiator_user")
testuser1 = self.find_or_create_user("testuser1")
testuser2 = self.find_or_create_user("testuser2")
@ -38,7 +30,6 @@ class TestGetGroupMembers(BaseTest):
UserService.add_user_to_group(testuser2, group_a)
UserService.add_user_to_group(testuser3, group_b)
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
process_model = load_test_spec(
process_model_id="test_group/get_group_members",
bpmn_file_name="get_group_members.bpmn",

View File

@ -1,10 +1,7 @@
"""Test_get_localtime."""
from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
@ -18,12 +15,8 @@ class TestGetLastUserCompletingTask(BaseTest):
def test_get_last_user_completing_task_script_works(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_sets_permission_correctly_on_human_task."""
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
initiator_user = self.find_or_create_user("initiator_user")
assert initiator_user.principal is not None
AuthorizationService.import_permissions_from_yaml_file()

View File

@ -1,9 +1,7 @@
"""Test_get_localtime."""
import datetime
import pytz
from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
@ -20,10 +18,7 @@ from spiffworkflow_backend.services.process_instance_service import (
class TestGetLocaltime(BaseTest):
"""TestProcessAPi."""
def test_get_localtime_script_directly(self) -> None:
"""Test_get_localtime_script_directly."""
current_time = datetime.datetime.now()
timezone = "US/Pacific"
process_model_identifier = "test_process_model"
@ -44,17 +39,14 @@ class TestGetLocaltime(BaseTest):
def test_get_localtime_script_through_bpmn(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_process_instance_run."""
initiator_user = self.find_or_create_user("initiator_user")
self.add_permissions_to_user(
initiator_user,
target_uri="/v1.0/process-groups",
permission_names=["read", "create"],
)
self.create_process_group_with_api(client=client, user=initiator_user, process_group_id="test_group")
process_model = load_test_spec(
process_model_id="test_group/get_localtime",
bpmn_file_name="get_localtime.bpmn",

View File

@ -1,10 +1,8 @@
"""Test_get_localtime."""
from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
@ -18,19 +16,15 @@ class TestGetProcessInitiatorUser(BaseTest):
def test_get_process_initiator_user(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_sets_permission_correctly_on_human_task."""
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
initiator_user = self.find_or_create_user("initiator_user")
assert initiator_user.principal is not None
AuthorizationService.import_permissions_from_yaml_file()
process_model = load_test_spec(
process_model_id="misc/category_number_one/simple_form",
# bpmn_file_name="simp.bpmn",
process_model_source_directory="simple_form",
)
process_instance = self.create_process_instance_from_process_model(

View File

@ -1,7 +1,6 @@
"""Test_get_localtime."""
import pytest
from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
@ -15,7 +14,6 @@ class TestRefreshPermissions(BaseTest):
def test_refresh_permissions_requires_elevated_permission(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
basic_user = self.find_or_create_user("basic_user")

View File

@ -1,42 +0,0 @@
"""Test_get_localtime."""
from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.process_instance_metadata import (
ProcessInstanceMetadataModel,
)
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
class TestSaveProcessInstanceMetadata(BaseTest):
"""TestSaveProcessInstanceMetadata."""
def test_can_save_process_instance_metadata(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_can_save_process_instance_metadata."""
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
process_model = load_test_spec(
process_model_id="save_process_instance_metadata/save_process_instance_metadata",
bpmn_file_name="save_process_instance_metadata.bpmn",
process_model_source_directory="save_process_instance_metadata",
)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=with_super_admin_user
)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
process_instance_metadata = ProcessInstanceMetadataModel.query.filter_by(
process_instance_id=process_instance.id
).all()
assert len(process_instance_metadata) == 3

View File

@ -3,9 +3,9 @@ import pytest
from flask import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.authorization_service import GroupPermissionsDict
from spiffworkflow_backend.services.authorization_service import InvalidPermissionError
@ -16,7 +16,6 @@ from spiffworkflow_backend.services.process_instance_processor import (
from spiffworkflow_backend.services.process_instance_service import (
ProcessInstanceService,
)
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.user_service import UserService
@ -60,7 +59,6 @@ class TestAuthorizationService(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_user_can_be_added_to_human_task_on_first_login."""
initiator_user = self.find_or_create_user("initiator_user")
@ -69,16 +67,12 @@ class TestAuthorizationService(BaseTest):
self.find_or_create_user("testuser1")
AuthorizationService.import_permissions_from_yaml_file()
process_model_identifier = self.create_group_and_model_with_bpmn(
client=client,
user=with_super_admin_user,
process_group_id="test_group",
process_model_id="model_with_lanes",
process_model = load_test_spec(
process_model_id="test_group/model_with_lanes",
bpmn_file_name="lanes.bpmn",
bpmn_file_location="model_with_lanes",
process_model_source_directory="model_with_lanes",
)
process_model = ProcessModelService.get_process_model(process_model_id=process_model_identifier)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=initiator_user
)
@ -141,6 +135,7 @@ class TestAuthorizationService(BaseTest):
"delete",
),
("/process-instances/some-process-group:some-process-model:*", "read"),
("/process-model-natural-language/some-process-group:some-process-model:*", "create"),
("/process-model-publish/some-process-group:some-process-model:*", "create"),
("/process-models/some-process-group:some-process-model:*", "create"),
("/process-models/some-process-group:some-process-model:*", "delete"),
@ -163,26 +158,28 @@ class TestAuthorizationService(BaseTest):
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_explode_permissions_start_on_process_group."""
expected_permissions = [
("/event-error-details/some-process-group:some-process-model:*", "read"),
(
"/logs/some-process-group:some-process-model:*",
"read",
),
(
"/logs/typeahead-filter-values/some-process-group:some-process-model:*",
"read",
),
(
"/process-data-file-download/some-process-group:some-process-model:*",
"read",
),
(
"/process-instances/for-me/some-process-group:some-process-model:*",
"read",
),
("/process-instances/some-process-group:some-process-model:*", "create"),
]
expected_permissions = sorted(
[
("/event-error-details/some-process-group:some-process-model:*", "read"),
(
"/logs/some-process-group:some-process-model:*",
"read",
),
(
"/logs/typeahead-filter-values/some-process-group:some-process-model:*",
"read",
),
(
"/process-data-file-download/some-process-group:some-process-model:*",
"read",
),
(
"/process-instances/for-me/some-process-group:some-process-model:*",
"read",
),
("/process-instances/some-process-group:some-process-model:*", "create"),
]
)
permissions_to_assign = AuthorizationService.explode_permissions(
"start", "PG:/some-process-group/some-process-model"
)
@ -222,6 +219,7 @@ class TestAuthorizationService(BaseTest):
"delete",
),
("/process-instances/some-process-group:some-process-model/*", "read"),
("/process-model-natural-language/some-process-group:some-process-model/*", "create"),
("/process-model-publish/some-process-group:some-process-model/*", "create"),
("/process-models/some-process-group:some-process-model/*", "create"),
("/process-models/some-process-group:some-process-model/*", "delete"),
@ -244,26 +242,28 @@ class TestAuthorizationService(BaseTest):
with_db_and_bpmn_file_cleanup: None,
) -> None:
"""Test_explode_permissions_start_on_process_model."""
expected_permissions = [
(
"/event-error-details/some-process-group:some-process-model/*",
"read",
),
(
"/logs/some-process-group:some-process-model/*",
"read",
),
("/logs/typeahead-filter-values/some-process-group:some-process-model/*", "read"),
(
"/process-data-file-download/some-process-group:some-process-model/*",
"read",
),
(
"/process-instances/for-me/some-process-group:some-process-model/*",
"read",
),
("/process-instances/some-process-group:some-process-model/*", "create"),
]
expected_permissions = sorted(
[
(
"/event-error-details/some-process-group:some-process-model/*",
"read",
),
(
"/logs/some-process-group:some-process-model/*",
"read",
),
("/logs/typeahead-filter-values/some-process-group:some-process-model/*", "read"),
(
"/process-data-file-download/some-process-group:some-process-model/*",
"read",
),
(
"/process-instances/for-me/some-process-group:some-process-model/*",
"read",
),
("/process-instances/some-process-group:some-process-model/*", "create"),
]
)
permissions_to_assign = AuthorizationService.explode_permissions(
"start", "PM:/some-process-group/some-process-model"
)
@ -276,28 +276,32 @@ class TestAuthorizationService(BaseTest):
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
expected_permissions = [
("/active-users/*", "read"),
("/debug/version-info", "read"),
("/process-groups", "read"),
("/process-instances/find-by-id/*", "read"),
("/process-instances/for-me", "create"),
("/process-instances/report-metadata", "read"),
("/process-instances/reports/*", "create"),
("/process-instances/reports/*", "delete"),
("/process-instances/reports/*", "read"),
("/process-instances/reports/*", "update"),
("/process-models", "read"),
("/processes", "read"),
("/processes/callers", "read"),
("/service-tasks", "read"),
("/tasks/*", "create"),
("/tasks/*", "delete"),
("/tasks/*", "read"),
("/tasks/*", "update"),
("/user-groups/for-current-user", "read"),
("/users/exists/by-username", "create"),
]
expected_permissions = sorted(
[
("/active-users/*", "create"),
("/connector-proxy/typeahead/*", "read"),
("/debug/version-info", "read"),
("/process-groups", "read"),
("/process-instances/find-by-id/*", "read"),
("/process-instances/for-me", "create"),
("/process-instances/report-metadata", "read"),
("/process-instances/reports/*", "create"),
("/process-instances/reports/*", "delete"),
("/process-instances/reports/*", "read"),
("/process-instances/reports/*", "update"),
("/process-models", "read"),
("/processes", "read"),
("/processes/callers", "read"),
("/service-tasks", "read"),
("/tasks/*", "create"),
("/tasks/*", "delete"),
("/tasks/*", "read"),
("/tasks/*", "update"),
("/user-groups/for-current-user", "read"),
("/users/exists/by-username", "create"),
("/users/search", "read"),
]
)
permissions_to_assign = AuthorizationService.explode_permissions("all", "BASIC")
permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign])
assert permissions_to_assign_tuples == expected_permissions
@ -308,26 +312,30 @@ class TestAuthorizationService(BaseTest):
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
) -> None:
expected_permissions = [
("/can-run-privileged-script/*", "create"),
("/messages/*", "create"),
("/process-instance-reset/*", "create"),
("/process-instance-resume/*", "create"),
("/process-instance-suspend/*", "create"),
("/process-instance-terminate/*", "create"),
("/process-instances/*", "create"),
("/process-instances/*", "delete"),
("/process-instances/*", "read"),
("/process-instances/*", "update"),
("/secrets/*", "create"),
("/secrets/*", "delete"),
("/secrets/*", "read"),
("/secrets/*", "update"),
("/send-event/*", "create"),
("/task-complete/*", "create"),
("/task-data/*", "update"),
("/users/search", "read"),
]
expected_permissions = sorted(
[
("/authentications", "read"),
("/can-run-privileged-script/*", "create"),
("/debug/*", "create"),
("/messages", "read"),
("/messages/*", "create"),
("/process-instance-reset/*", "create"),
("/process-instance-resume/*", "create"),
("/process-instance-suspend/*", "create"),
("/process-instance-terminate/*", "create"),
("/process-instances/*", "create"),
("/process-instances/*", "delete"),
("/process-instances/*", "read"),
("/process-instances/*", "update"),
("/secrets/*", "create"),
("/secrets/*", "delete"),
("/secrets/*", "read"),
("/secrets/*", "update"),
("/send-event/*", "create"),
("/task-complete/*", "create"),
("/task-data/*", "update"),
]
)
permissions_to_assign = AuthorizationService.explode_permissions("all", "ELEVATED")
permissions_to_assign_tuples = sorted([(p.target_uri, p.permission) for p in permissions_to_assign])
assert permissions_to_assign_tuples == expected_permissions
@ -437,7 +445,7 @@ class TestAuthorizationService(BaseTest):
AuthorizationService.refresh_permissions(group_info)
assert GroupModel.query.filter_by(identifier="group_two").first() is None
assert GroupModel.query.filter_by(identifier="group_one").first() is not None
self.assert_user_has_permission(admin_user, "create", "/anything-they-want")
self.assert_user_has_permission(admin_user, "create", "/v1.0/process-groups/whatever")
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey")
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo")
self.assert_user_has_permission(user, "create", "/v1.0/process-groups/hey:yo")
@ -469,7 +477,7 @@ class TestAuthorizationService(BaseTest):
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey")
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo")
self.assert_user_has_permission(user, "create", "/v1.0/process-groups/hey:yo", expected_result=False)
self.assert_user_has_permission(admin_user, "create", "/anything-they-want")
self.assert_user_has_permission(admin_user, "create", "/v1.0/process-groups/whatever")
self.assert_user_has_permission(user_two, "read", "/v1.0/process-groups/hey", expected_result=False)
assert GroupModel.query.filter_by(identifier="group_three").first() is not None

View File

@ -2,8 +2,8 @@
from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
@ -18,29 +18,17 @@ class TestDotNotation(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_form_data_conversion_to_dot_dict."""
process_group_id = "dot_notation_group"
process_model_id = "test_dot_notation"
process_model_id = "dot_notation_group/test_dot_notation"
bpmn_file_name = "diagram.bpmn"
bpmn_file_location = "dot_notation"
process_model_identifier = self.create_group_and_model_with_bpmn(
client,
with_super_admin_user,
process_group_id=process_group_id,
process_model = load_test_spec(
process_model_id=process_model_id,
bpmn_file_name=bpmn_file_name,
bpmn_file_location=bpmn_file_location,
process_model_source_directory=bpmn_file_location,
)
headers = self.logged_in_headers(with_super_admin_user)
response = self.create_process_instance_from_process_model_id_with_api(
client, process_model_identifier, headers
)
process_instance_id = response.json["id"]
process_instance = ProcessInstanceService().get_process_instance(process_instance_id)
process_instance = self.create_process_instance_from_process_model(process_model)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
human_task = process_instance.human_tasks[0]
@ -53,7 +41,9 @@ class TestDotNotation(BaseTest):
"invoice.invoiceAmount": "1000.00",
"invoice.dueDate": "09/30/2022",
}
ProcessInstanceService.complete_form_task(processor, user_task, form_data, with_super_admin_user, human_task)
ProcessInstanceService.complete_form_task(
processor, user_task, form_data, process_instance.process_initiator, human_task
)
expected = {
"contibutorName": "Elizabeth",

View File

@ -9,14 +9,10 @@ from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.error_handling_service import ErrorHandlingService
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
from spiffworkflow_backend.services.process_instance_service import (
ProcessInstanceService,
)
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.workflow_execution_service import WorkflowExecutionServiceError
@ -27,12 +23,8 @@ class TestErrorHandlingService(BaseTest):
Like it can fire off BPMN messages in case a BPMN Task is waiting for that message.
"""
def run_process_model_and_handle_error(
self, process_model: ProcessModelInfo, user: UserModel
) -> ProcessInstanceModel:
process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
process_model.id, user
)
def run_process_model_and_handle_error(self, process_model: ProcessModelInfo) -> ProcessInstanceModel:
process_instance = self.create_process_instance_from_process_model(process_model)
pip = ProcessInstanceProcessor(process_instance)
with pytest.raises(WorkflowExecutionServiceError) as e:
pip.do_engine_steps(save=True)
@ -44,7 +36,6 @@ class TestErrorHandlingService(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Process Model in DB marked as suspended when error occurs."""
process_model = load_test_spec(
@ -54,13 +45,13 @@ class TestErrorHandlingService(BaseTest):
)
# Process instance should be marked as errored by default.
process_instance = self.run_process_model_and_handle_error(process_model, with_super_admin_user)
process_instance = self.run_process_model_and_handle_error(process_model)
assert ProcessInstanceStatus.error.value == process_instance.status
# If process model should be suspended on error, then that is what should happen.
process_model.fault_or_suspend_on_exception = "suspend"
ProcessModelService.save_process_model(process_model)
process_instance = self.run_process_model_and_handle_error(process_model, with_super_admin_user)
process_instance = self.run_process_model_and_handle_error(process_model)
assert ProcessInstanceStatus.suspended.value == process_instance.status
def test_error_sends_bpmn_message(
@ -68,7 +59,6 @@ class TestErrorHandlingService(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Real BPMN Messages should get generated and processes should fire off and complete."""
process_model = load_test_spec(
@ -85,7 +75,7 @@ class TestErrorHandlingService(BaseTest):
process_model.exception_notification_addresses = ["dan@ILoveToReadErrorsInMyEmails.com"]
ProcessModelService.save_process_model(process_model)
# kick off the process and assure it got marked as an error.
process_instance = self.run_process_model_and_handle_error(process_model, with_super_admin_user)
process_instance = self.run_process_model_and_handle_error(process_model)
assert ProcessInstanceStatus.error.value == process_instance.status
# Both send and receive messages should be generated, matched

View File

@ -3,44 +3,33 @@ import pytest
from flask import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.models.process_model import ProcessModelInfo
class TestMessageInstance(BaseTest):
"""TestMessageInstance."""
def setup_message_tests(self, client: FlaskClient, user: UserModel) -> str:
"""Setup_message_tests."""
process_group_id = "test_group"
process_model_id = "hello_world"
def setup_message_tests(self, client: FlaskClient) -> ProcessModelInfo:
process_model_id = "testk_group/hello_world"
bpmn_file_name = "hello_world.bpmn"
bpmn_file_location = "hello_world"
process_model_identifier = self.create_group_and_model_with_bpmn(
client,
user,
process_group_id=process_group_id,
process_model = load_test_spec(
process_model_id=process_model_id,
bpmn_file_name=bpmn_file_name,
bpmn_file_location=bpmn_file_location,
process_model_source_directory=bpmn_file_location,
)
return process_model_identifier
return process_model
def test_can_create_message_instance(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_can_create_message_instance."""
message_name = "Message Model One"
process_model_identifier = self.setup_message_tests(client, with_super_admin_user)
process_model = ProcessModelService.get_process_model(process_model_id=process_model_identifier)
process_model = self.setup_message_tests(client)
process_instance = self.create_process_instance_from_process_model(process_model, "waiting")
queued_message = MessageInstanceModel(
@ -64,12 +53,9 @@ class TestMessageInstance(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_cannot_set_invalid_status."""
message_name = "message_model_one"
process_model_identifier = self.setup_message_tests(client, with_super_admin_user)
process_model = ProcessModelService.get_process_model(process_model_id=process_model_identifier)
process_model = self.setup_message_tests(client)
process_instance = self.create_process_instance_from_process_model(process_model, "waiting")
with pytest.raises(ValueError) as exception:
@ -100,13 +86,9 @@ class TestMessageInstance(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_cannot_set_invalid_message_type."""
message_name = "message_model_one"
process_model_identifier = self.setup_message_tests(client, with_super_admin_user)
process_model = ProcessModelService.get_process_model(process_model_id=process_model_identifier)
process_model = self.setup_message_tests(client)
process_instance = self.create_process_instance_from_process_model(process_model, "waiting")
with pytest.raises(ValueError) as exception:
@ -136,13 +118,9 @@ class TestMessageInstance(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_force_failure_cause_if_status_is_failure."""
message_name = "message_model_one"
process_model_identifier = self.setup_message_tests(client, with_super_admin_user)
process_model = ProcessModelService.get_process_model(process_model_id=process_model_identifier)
process_model = self.setup_message_tests(client)
process_instance = self.create_process_instance_from_process_model(process_model, "waiting")
queued_message = MessageInstanceModel(

View File

@ -1,15 +1,10 @@
"""Test_message_service."""
import pytest
from flask import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.routes.messages_controller import message_send
from spiffworkflow_backend.services.message_service import MessageService
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
@ -20,66 +15,11 @@ from spiffworkflow_backend.services.process_instance_service import (
class TestMessageService(BaseTest):
"""TestMessageService."""
def test_message_from_api_into_running_process(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test sending a message to a running process via the API.
This example workflow will send a message called 'request_approval' and then wait for a response message
of 'approval_result'. This test assures that it will fire the message with the correct correlation properties
and will respond only to a message called 'approval_result' that has the matching correlation properties,
as sent by an API Call.
"""
self.payload = {
"customer_id": "Sartography",
"po_number": 1001,
"description": "We built a new feature for messages!",
"amount": "100.00",
}
self.start_sender_process(client, with_super_admin_user, "test_from_api")
self.assure_a_message_was_sent()
self.assure_there_is_a_process_waiting_on_a_message()
# Make an API call to the service endpoint, but use the wrong po number
with pytest.raises(ApiError):
message_send("Approval Result", {"payload": {"po_number": 5001}})
# Should return an error when making an API call for right po number, wrong client
with pytest.raises(ApiError):
message_send(
"Approval Result",
{"payload": {"po_number": 1001, "customer_id": "jon"}},
)
# No error when calling with the correct parameters
message_send(
"Approval Result",
{"payload": {"po_number": 1001, "customer_id": "Sartography"}},
)
# There is no longer a waiting message
waiting_messages = (
MessageInstanceModel.query.filter_by(message_type="receive")
.filter_by(status="ready")
.filter_by(process_instance_id=self.process_instance.id)
.all()
)
assert len(waiting_messages) == 0
# The process has completed
assert self.process_instance.status == "complete"
def test_single_conversation_between_two_processes(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test messages between two different running processes using a single conversation.
@ -87,7 +27,7 @@ class TestMessageService(BaseTest):
we have two process instances that are communicating with each other using one conversation about an
Invoice whose details are defined in the following message payload
"""
self.payload = {
payload = {
"customer_id": "Sartography",
"po_number": 1001,
"description": "We built a new feature for messages!",
@ -104,8 +44,8 @@ class TestMessageService(BaseTest):
)
# Now start the main process
self.start_sender_process(client, with_super_admin_user, "test_between_processes")
self.assure_a_message_was_sent()
process_instance = self.start_sender_process(client, payload, "test_between_processes")
self.assure_a_message_was_sent(process_instance, payload)
# This is typically called in a background cron process, so we will manually call it
# here in the tests
@ -113,7 +53,7 @@ class TestMessageService(BaseTest):
MessageService.correlate_all_message_instances()
# The sender process should still be waiting on a message to be returned to it ...
self.assure_there_is_a_process_waiting_on_a_message()
self.assure_there_is_a_process_waiting_on_a_message(process_instance)
# The second time we call ths process_message_isntances (again it would typically be running on cron)
# it will deliver the message that was sent from the receiver back to the original sender.
@ -125,7 +65,7 @@ class TestMessageService(BaseTest):
waiting_messages = (
MessageInstanceModel.query.filter_by(message_type="receive")
.filter_by(status="ready")
.filter_by(process_instance_id=self.process_instance.id)
.filter_by(process_instance_id=process_instance.id)
.order_by(MessageInstanceModel.id)
.all()
)
@ -136,7 +76,7 @@ class TestMessageService(BaseTest):
assert len(waiting_messages) == 0
# The message sender process is complete
assert self.process_instance.status == "complete"
assert process_instance.status == "complete"
# The message receiver process is also complete
message_receiver_process = (
@ -146,83 +86,14 @@ class TestMessageService(BaseTest):
)
assert message_receiver_process.status == "complete"
def start_sender_process(
self,
client: FlaskClient,
with_super_admin_user: UserModel,
group_name: str = "test_group",
) -> None:
process_group_id = group_name
self.create_process_group_with_api(client, with_super_admin_user, process_group_id, process_group_id)
process_model = load_test_spec(
"test_group/message",
process_model_source_directory="message_send_one_conversation",
bpmn_file_name="message_sender.bpmn", # Slightly misnamed, it sends and receives
)
self.process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
process_model.id,
with_super_admin_user,
)
processor_send_receive = ProcessInstanceProcessor(self.process_instance)
processor_send_receive.do_engine_steps(save=True)
task = processor_send_receive.get_all_user_tasks()[0]
human_task = self.process_instance.active_human_tasks[0]
ProcessInstanceService.complete_form_task(
processor_send_receive,
task,
self.payload,
with_super_admin_user,
human_task,
)
processor_send_receive.save()
def assure_a_message_was_sent(self) -> None:
# There should be one new send message for the given process instance.
send_messages = (
MessageInstanceModel.query.filter_by(message_type="send")
.filter_by(process_instance_id=self.process_instance.id)
.order_by(MessageInstanceModel.id)
.all()
)
assert len(send_messages) == 1
send_message = send_messages[0]
assert send_message.payload == self.payload, "The send message should match up with the payload"
assert send_message.name == "Request Approval"
assert send_message.status == "ready"
def assure_there_is_a_process_waiting_on_a_message(self) -> None:
# There should be one new send message for the given process instance.
waiting_messages = (
MessageInstanceModel.query.filter_by(message_type="receive")
.filter_by(status="ready")
.filter_by(process_instance_id=self.process_instance.id)
.order_by(MessageInstanceModel.id)
.all()
)
assert len(waiting_messages) == 1
waiting_message = waiting_messages[0]
self.assure_correlation_properties_are_right(waiting_message)
def assure_correlation_properties_are_right(self, message: MessageInstanceModel) -> None:
# Correlation Properties should match up
po_curr = next(c for c in message.correlation_rules if c.name == "po_number")
customer_curr = next(c for c in message.correlation_rules if c.name == "customer_id")
assert po_curr is not None
assert customer_curr is not None
def test_can_send_message_to_multiple_process_models(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_can_send_message_to_multiple_process_models."""
process_group_id = "test_group_multi"
self.create_process_group_with_api(client, with_super_admin_user, process_group_id, process_group_id)
# self.create_process_group_with_api(client, with_super_admin_user, process_group_id, process_group_id)
process_model_sender = load_test_spec(
"test_group/message_sender",

View File

@ -1,4 +1,3 @@
"""Test Permissions."""
from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
@ -9,7 +8,6 @@ from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
from spiffworkflow_backend.models.permission_target import PermissionTargetModel
from spiffworkflow_backend.models.principal import PrincipalModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.user_service import UserService
@ -22,18 +20,13 @@ from spiffworkflow_backend.services.user_service import UserService
# * super-admins users maybe conventionally get the user role as well
# finance-admin role allows create, update, and delete of all models under the finance group
class TestPermissions(BaseTest):
"""TestPermissions."""
def test_user_can_be_given_permission_to_administer_process_group(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_user_can_be_given_permission_to_administer_process_group."""
process_group_id = "group-a"
self.create_process_group_with_api(client, with_super_admin_user, process_group_id, process_group_id)
load_test_spec(
"group-a/timers_intermediate_catch_event",
bpmn_file_name="timers_intermediate_catch_event.bpmn",
@ -58,7 +51,6 @@ class TestPermissions(BaseTest):
def test_group_a_admin_needs_to_stay_away_from_group_b(
self, app: Flask, with_db_and_bpmn_file_cleanup: None
) -> None:
"""Test_group_a_admin_needs_to_stay_away_from_group_b."""
process_group_ids = ["group-a", "group-b"]
process_group_a_id = process_group_ids[0]
process_group_b_id = process_group_ids[1]
@ -87,7 +79,6 @@ class TestPermissions(BaseTest):
self.assert_user_has_permission(group_a_admin, "update", f"/{process_group_b_id}", expected_result=False)
def test_user_can_be_granted_access_through_a_group(self, app: Flask, with_db_and_bpmn_file_cleanup: None) -> None:
"""Test_user_can_be_granted_access_through_a_group."""
process_group_ids = ["group-a", "group-b"]
process_group_a_id = process_group_ids[0]
for process_group_id in process_group_ids:
@ -125,7 +116,6 @@ class TestPermissions(BaseTest):
def test_user_can_be_read_models_with_global_permission(
self, app: Flask, with_db_and_bpmn_file_cleanup: None
) -> None:
"""Test_user_can_be_read_models_with_global_permission."""
process_group_ids = ["group-a", "group-b"]
process_group_a_id = process_group_ids[0]
process_group_b_id = process_group_ids[1]
@ -156,7 +146,6 @@ class TestPermissions(BaseTest):
def test_user_can_access_base_path_when_given_wildcard_permission(
self, app: Flask, with_db_and_bpmn_file_cleanup: None
) -> None:
"""Test_user_can_access_base_path_when_given_wildcard_permission."""
group_a_admin = self.find_or_create_user()
permission_target = PermissionTargetModel(uri="/process-models/%")

View File

@ -18,7 +18,6 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.authorization_service import (
UserDoesNotHaveAccessToTaskError,
@ -70,10 +69,9 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_sets_permission_correctly_on_human_task."""
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
self.create_process_group("test_group", "test_group")
initiator_user = self.find_or_create_user("initiator_user")
finance_user = self.find_or_create_user("testuser2")
assert initiator_user.principal is not None
@ -138,10 +136,9 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_sets_permission_correctly_on_human_task_when_using_dict."""
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
self.create_process_group("test_group", "test_group")
initiator_user = self.find_or_create_user("initiator_user")
finance_user_three = self.find_or_create_user("testuser3")
finance_user_four = self.find_or_create_user("testuser4")
@ -234,7 +231,6 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_does_not_recreate_human_tasks_on_multiple_saves."""
initiator_user = self.find_or_create_user("initiator_user")
@ -264,9 +260,8 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
self.create_process_group("test_group", "test_group")
initiator_user = self.find_or_create_user("initiator_user")
finance_user_three = self.find_or_create_user("testuser3")
assert initiator_user.principal is not None
@ -319,9 +314,8 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
self.create_process_group("test_group", "test_group")
initiator_user = self.find_or_create_user("initiator_user")
finance_user_three = self.find_or_create_user("testuser3")
assert initiator_user.principal is not None
@ -439,23 +433,20 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
self.create_process_group("test_group", "test_group")
process_model = load_test_spec(
process_model_id="test_group/boundary_event_reset",
process_model_source_directory="boundary_event_reset",
)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=with_super_admin_user
)
process_instance = self.create_process_instance_from_process_model(process_model=process_model)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
assert len(process_instance.active_human_tasks) == 1
human_task_one = process_instance.active_human_tasks[0]
spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id))
ProcessInstanceService.complete_form_task(
processor, spiff_manual_task, {}, with_super_admin_user, human_task_one
processor, spiff_manual_task, {}, process_instance.process_initiator, human_task_one
)
assert (
len(process_instance.active_human_tasks) == 1
@ -473,7 +464,9 @@ class TestProcessInstanceProcessor(BaseTest):
human_task_one = process_instance.active_human_tasks[0]
assert human_task_one.task_title == "Manual Task #1"
processor = ProcessInstanceProcessor(process_instance)
processor.manual_complete_task(str(human_task_one.task_id), execute=True)
processor.manual_complete_task(
str(human_task_one.task_id), execute=True, user=process_instance.process_initiator
)
processor = ProcessInstanceProcessor(process_instance)
processor.resume()
processor.do_engine_steps(save=True)
@ -490,22 +483,21 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
self.create_process_group("test_group", "test_group")
process_model = load_test_spec(
process_model_id="test_group/step_through_gateway",
process_model_source_directory="step_through_gateway",
)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=with_super_admin_user
)
process_instance = self.create_process_instance_from_process_model(process_model=process_model)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
assert len(process_instance.active_human_tasks) == 1
human_task_one = process_instance.active_human_tasks[0]
processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id))
processor.manual_complete_task(str(human_task_one.task_id), execute=True)
processor.manual_complete_task(
str(human_task_one.task_id), execute=True, user=process_instance.process_initiator
)
processor.save()
processor = ProcessInstanceProcessor(process_instance)
step1_task = processor.get_task_by_bpmn_identifier("step_1", processor.bpmn_process_instance)
@ -516,7 +508,7 @@ class TestProcessInstanceProcessor(BaseTest):
assert gateway_task.state == TaskState.READY
gateway_task = processor.bpmn_process_instance.get_tasks(TaskState.READY)[0]
processor.manual_complete_task(str(gateway_task.id), execute=True)
processor.manual_complete_task(str(gateway_task.id), execute=True, user=process_instance.process_initiator)
processor.save()
processor = ProcessInstanceProcessor(process_instance)
gateway_task = processor.get_task_by_bpmn_identifier("Gateway_Open", processor.bpmn_process_instance)
@ -528,9 +520,8 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
self.create_process_group("test_group", "test_group")
initiator_user = self.find_or_create_user("initiator_user")
finance_user_three = self.find_or_create_user("testuser3")
assert initiator_user.principal is not None
@ -757,10 +748,9 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_does_not_recreate_human_tasks_on_multiple_saves."""
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
self.create_process_group("test_group", "test_group")
initiator_user = self.find_or_create_user("initiator_user")
finance_user_three = self.find_or_create_user("testuser3")
assert initiator_user.principal is not None
@ -868,7 +858,6 @@ class TestProcessInstanceProcessor(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_task_data_is_set_even_if_process_instance_errors."""
process_model = load_test_spec(
@ -876,9 +865,7 @@ class TestProcessInstanceProcessor(BaseTest):
bpmn_file_name="script_error_with_task_data.bpmn",
process_model_source_directory="error",
)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=with_super_admin_user
)
process_instance = self.create_process_instance_from_process_model(process_model=process_model)
processor = ProcessInstanceProcessor(process_instance)
with pytest.raises(WorkflowExecutionServiceError):

View File

@ -1,141 +0,0 @@
# from typing import Optional
#
# from flask.app import Flask
# from tests.spiffworkflow_backend.helpers.base_test import BaseTest
#
# from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
# from spiffworkflow_backend.models.process_instance_report import (
# ProcessInstanceReportModel,
# )
#
# # from tests.spiffworkflow_backend.helpers.test_data import find_or_create_process_group
# # from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
# # from spiffworkflow_backend.models.permission_target import PermissionTargetModel
#
#
# def test_generate_report_with_filter_by(
# app: Flask,
# with_db_and_bpmn_file_cleanup: None,
# setup_process_instances_for_reports: list[ProcessInstanceModel],
# ) -> None:
# """Test_user_can_be_given_permission_to_administer_process_group."""
# process_instances = setup_process_instances_for_reports
# report_metadata = {
# "filter_by": [
# {"field_name": "grade_level", "operator": "equals", "field_value": 2}
# ]
# }
# results = do_report_with_metadata_and_instances(report_metadata, process_instances)
# assert len(results) == 2
# names = get_names_from_results(results)
# assert names == ["kay", "jay"]
#
#
# def test_generate_report_with_filter_by_with_variable_substitution(
# app: Flask,
# with_db_and_bpmn_file_cleanup: None,
# setup_process_instances_for_reports: list[ProcessInstanceModel],
# ) -> None:
# """Test_generate_report_with_filter_by_with_variable_substitution."""
# process_instances = setup_process_instances_for_reports
# report_metadata = {
# "filter_by": [
# {
# "field_name": "grade_level",
# "operator": "equals",
# "field_value": "{{grade_level}}",
# }
# ]
# }
# results = do_report_with_metadata_and_instances(
# report_metadata, process_instances, {"grade_level": 1}
# )
# assert len(results) == 1
# names = get_names_from_results(results)
# assert names == ["ray"]
#
#
# def test_generate_report_with_order_by_and_one_field(
# app: Flask,
# with_db_and_bpmn_file_cleanup: None,
# setup_process_instances_for_reports: list[ProcessInstanceModel],
# ) -> None:
# """Test_generate_report_with_order_by_and_one_field."""
# process_instances = setup_process_instances_for_reports
# report_metadata = {"order_by": ["test_score"]}
# results = do_report_with_metadata_and_instances(report_metadata, process_instances)
# assert len(results) == 3
# names = get_names_from_results(results)
# assert names == ["jay", "ray", "kay"]
#
#
# def test_generate_report_with_order_by_and_two_fields(
# app: Flask,
# with_db_and_bpmn_file_cleanup: None,
# setup_process_instances_for_reports: list[ProcessInstanceModel],
# ) -> None:
# """Test_generate_report_with_order_by_and_two_fields."""
# process_instances = setup_process_instances_for_reports
# report_metadata = {"order_by": ["grade_level", "test_score"]}
# results = do_report_with_metadata_and_instances(report_metadata, process_instances)
# assert len(results) == 3
# names = get_names_from_results(results)
# assert names == ["ray", "jay", "kay"]
#
#
# def test_generate_report_with_order_by_desc(
# app: Flask,
# with_db_and_bpmn_file_cleanup: None,
# setup_process_instances_for_reports: list[ProcessInstanceModel],
# ) -> None:
# """Test_generate_report_with_order_by_desc."""
# process_instances = setup_process_instances_for_reports
# report_metadata = {"order_by": ["grade_level", "-test_score"]}
# results = do_report_with_metadata_and_instances(report_metadata, process_instances)
# assert len(results) == 3
# names = get_names_from_results(results)
# assert names == ["ray", "kay", "jay"]
#
#
# def test_generate_report_with_columns(
# app: Flask,
# with_db_and_bpmn_file_cleanup: None,
# setup_process_instances_for_reports: list[ProcessInstanceModel],
# ) -> None:
# """Test_generate_report_with_columns."""
# process_instances = setup_process_instances_for_reports
# report_metadata = {
# "columns": [
# {"Header": "Name", "accessor": "name"},
# {"Header": "Status", "accessor": "status"},
# ],
# "order_by": ["test_score"],
# "filter_by": [
# {"field_name": "grade_level", "operator": "equals", "field_value": 1}
# ],
# }
# results = do_report_with_metadata_and_instances(report_metadata, process_instances)
# assert len(results) == 1
# assert results == [{"name": "ray", "status": "complete"}]
#
#
# def do_report_with_metadata_and_instances(
# report_metadata: dict,
# process_instances: list[ProcessInstanceModel],
# substitution_variables: Optional[dict] = None,
# ) -> list[dict]:
# """Do_report_with_metadata_and_instances."""
# process_instance_report = ProcessInstanceReportModel.create_report(
# identifier="sure",
# report_metadata=report_metadata,
# user=BaseTest.find_or_create_user(),
# )
#
# return process_instance_report.generate_report(
# process_instances, substitution_variables
# )["results"]
#
#
# def get_names_from_results(results: list[dict]) -> list[str]:
# """Get_names_from_results."""
# return [result["name"] for result in results]

View File

@ -9,7 +9,6 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.process_instance_file_data import (
ProcessInstanceFileDataModel,
)
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_service import (
ProcessInstanceService,
)
@ -40,7 +39,6 @@ class TestProcessInstanceService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
model = ProcessInstanceService.file_data_model_for_value(
"uploaded_file",
@ -53,7 +51,6 @@ class TestProcessInstanceService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
model = ProcessInstanceService.file_data_model_for_value(
"not_a_file",
@ -66,7 +63,6 @@ class TestProcessInstanceService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
data = {
"uploaded_file": self.SAMPLE_FILE_DATA,
@ -80,7 +76,6 @@ class TestProcessInstanceService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
data = {
"uploaded_files": [self.SAMPLE_FILE_DATA, self.SAMPLE_FILE_DATA],
@ -95,7 +90,6 @@ class TestProcessInstanceService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
data = {
"not_a_file": "just a value",
@ -120,7 +114,6 @@ class TestProcessInstanceService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
data = {
"not_a_file": "just a value",
@ -135,7 +128,6 @@ class TestProcessInstanceService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
data = {
"uploaded_file": self.SAMPLE_FILE_DATA,
@ -156,7 +148,6 @@ class TestProcessInstanceService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
data = {
"not_a_file": "just a value",
@ -171,7 +162,6 @@ class TestProcessInstanceService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
data = {
"not_a_file": "just a value",
@ -198,7 +188,6 @@ class TestProcessInstanceService(BaseTest):
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
data = {
"File": [

View File

@ -1,11 +1,9 @@
"""Test_various_bpmn_constructs."""
import pytest
from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
@ -18,16 +16,12 @@ class TestRestrictedScriptEngine(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
process_model = load_test_spec(
"test_group/dangerous",
bpmn_file_name="read_etc_passwd.bpmn",
process_model_source_directory="dangerous-scripts",
)
self.find_or_create_user()
process_instance = self.create_process_instance_from_process_model(process_model)
processor = ProcessInstanceProcessor(process_instance)
@ -40,16 +34,12 @@ class TestRestrictedScriptEngine(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group")
process_model = load_test_spec(
"test_group/dangerous",
bpmn_file_name="read_env.bpmn",
process_model_source_directory="dangerous-scripts",
)
self.find_or_create_user()
process_instance = self.create_process_instance_from_process_model(process_model)
processor = ProcessInstanceProcessor(process_instance)

View File

@ -1,10 +1,8 @@
"""Test Permissions."""
from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
@ -13,20 +11,15 @@ from spiffworkflow_backend.services.script_unit_test_runner import ScriptUnitTes
class TestScriptUnitTestRunner(BaseTest):
"""TestScriptUnitTestRunner."""
def test_takes_data_and_returns_expected_result(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_takes_data_and_returns_expected_result."""
app.config["THREAD_LOCAL_DATA"].process_instance_id = None
process_group_id = "test_logging_spiff_logger"
self.create_process_group_with_api(client, with_super_admin_user, process_group_id, process_group_id)
process_model_id = "simple_script"
process_model_identifier = f"{process_group_id}/{process_model_id}"
load_test_spec(
@ -56,14 +49,10 @@ class TestScriptUnitTestRunner(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_fails_when_expected_output_does_not_match_actual_output."""
app.config["THREAD_LOCAL_DATA"].process_instance_id = None
process_group_id = "test_logging_spiff_logger"
self.create_process_group_with_api(client, with_super_admin_user, process_group_id, process_group_id)
process_model_id = "simple_script"
process_model_identifier = f"{process_group_id}/{process_model_id}"
load_test_spec(
@ -93,14 +82,10 @@ class TestScriptUnitTestRunner(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_script_with_unit_tests_when_hey_is_passed_in."""
app.config["THREAD_LOCAL_DATA"].process_instance_id = None
process_group_id = "script_with_unit_tests"
self.create_process_group_with_api(client, with_super_admin_user, process_group_id, process_group_id)
process_model_id = "script_with_unit_tests"
process_model_identifier = f"{process_group_id}/{process_model_id}"
load_test_spec(
@ -126,14 +111,10 @@ class TestScriptUnitTestRunner(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_script_with_unit_tests_when_hey_is_not_passed_in."""
app.config["THREAD_LOCAL_DATA"].process_instance_id = None
process_group_id = "script_with_unit_tests"
self.create_process_group_with_api(client, with_super_admin_user, process_group_id, process_group_id)
process_model_id = "script_with_unit_tests"
process_model_identifier = f"{process_group_id}/{process_model_id}"

View File

@ -11,7 +11,6 @@ from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.spec_reference import SpecReferenceCache
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.spec_file_service import (
ProcessModelFileInvalidError,
@ -22,8 +21,10 @@ from spiffworkflow_backend.services.spec_file_service import SpecFileService
class TestSpecFileService(BaseTest):
"""TestSpecFileService."""
process_group_id = "test_process_group_id"
process_model_id = "call_activity_nested"
process_group_id = ""
process_model_id = "test_process_group_id/call_activity_nested"
# process_group_id = "test_process_group_id"
# process_model_id = "call_activity_nested"
bpmn_file_name = "call_activity_nested.bpmn"
call_activity_nested_relative_file_path = os.path.join(process_group_id, process_model_id, bpmn_file_name)
@ -33,16 +34,11 @@ class TestSpecFileService(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_can_store_process_ids_for_lookup."""
self.create_group_and_model_with_bpmn(
client=client,
user=with_super_admin_user,
process_group_id=self.process_group_id,
load_test_spec(
process_model_id=self.process_model_id,
bpmn_file_name=self.bpmn_file_name,
bpmn_file_location="call_activity_nested",
process_model_source_directory="call_activity_nested",
)
bpmn_process_id_lookups = SpecReferenceCache.query.all()
assert len(bpmn_process_id_lookups) == 1
@ -54,17 +50,13 @@ class TestSpecFileService(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_fails_to_save_duplicate_process_id."""
bpmn_process_identifier = "Level1"
self.create_group_and_model_with_bpmn(
client=client,
user=with_super_admin_user,
process_group_id=self.process_group_id,
load_test_spec(
process_model_id=self.process_model_id,
bpmn_file_name=self.bpmn_file_name,
bpmn_file_location=self.process_model_id,
process_model_source_directory="call_activity_nested",
)
bpmn_process_id_lookups = SpecReferenceCache.query.all()
assert len(bpmn_process_id_lookups) == 1
@ -87,7 +79,6 @@ class TestSpecFileService(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_updates_relative_file_path_when_appropriate."""
bpmn_process_identifier = "Level1"
@ -99,13 +90,10 @@ class TestSpecFileService(BaseTest):
db.session.add(process_id_lookup)
db.session.commit()
self.create_group_and_model_with_bpmn(
client=client,
user=with_super_admin_user,
process_group_id=self.process_group_id,
load_test_spec(
process_model_id=self.process_model_id,
bpmn_file_name=self.bpmn_file_name,
bpmn_file_location=self.process_model_id,
process_model_source_directory="call_activity_nested",
)
bpmn_process_id_lookups = SpecReferenceCache.query.all()
@ -139,7 +127,6 @@ class TestSpecFileService(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""When a BPMN processes identifier is changed in a file, the old id is removed from the cache."""
old_identifier = "ye_old_identifier"
@ -147,19 +134,16 @@ class TestSpecFileService(BaseTest):
identifier=old_identifier,
relative_path=self.call_activity_nested_relative_file_path,
file_name=self.bpmn_file_name,
process_model_id=f"{self.process_group_id}/{self.process_model_id}",
process_model_id=self.process_model_id,
type="process",
)
db.session.add(process_id_lookup)
db.session.commit()
self.create_group_and_model_with_bpmn(
client=client,
user=with_super_admin_user,
process_group_id=self.process_group_id,
load_test_spec(
process_model_id=self.process_model_id,
bpmn_file_name=self.bpmn_file_name,
bpmn_file_location=self.process_model_id,
process_model_source_directory="call_activity_nested",
)
bpmn_process_id_lookups = SpecReferenceCache.query.all()
@ -173,7 +157,6 @@ class TestSpecFileService(BaseTest):
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_load_reference_information.
@ -186,32 +169,22 @@ class TestSpecFileService(BaseTest):
a DMN file can (theoretically) contain many decisions. So this
is an array.
"""
process_group_id = "test_group"
process_model_id = "call_activity_nested"
process_model_identifier = self.create_group_and_model_with_bpmn(
client=client,
user=with_super_admin_user,
process_group_id=process_group_id,
process_model_id = "test_group/call_activity_nested"
process_model = load_test_spec(
process_model_id=process_model_id,
# bpmn_file_name=bpmn_file_name,
bpmn_file_location=process_model_id,
process_model_source_directory="call_activity_nested",
)
# load_test_spec(
# ,
# process_model_source_directory="call_activity_nested",
# )
process_model_info = ProcessModelService.get_process_model(process_model_identifier)
files = SpecFileService.get_files(process_model_info)
files = SpecFileService.get_files(process_model)
file = next(filter(lambda f: f.name == "call_activity_level_3.bpmn", files))
ca_3 = SpecFileService.get_references_for_file(file, process_model_info)
ca_3 = SpecFileService.get_references_for_file(file, process_model)
assert len(ca_3) == 1
assert ca_3[0].display_name == "Level 3"
assert ca_3[0].identifier == "Level3"
assert ca_3[0].type == "process"
file = next(filter(lambda f: f.name == "level2c.dmn", files))
dmn1 = SpecFileService.get_references_for_file(file, process_model_info)
dmn1 = SpecFileService.get_references_for_file(file, process_model)
assert len(dmn1) == 1
assert dmn1[0].display_name == "Decision 1"
assert dmn1[0].identifier == "Decision_0vrtcmk"

View File

@ -2,34 +2,24 @@
from flask.app import Flask
from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
from spiffworkflow_backend.services.process_model_service import ProcessModelService
class TestVariousBpmnConstructs(BaseTest):
"""TestVariousBpmnConstructs."""
def test_running_process_with_timer_intermediate_catch_event(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_running_process_with_timer_intermediate_catch_event."""
process_model_identifier = self.create_group_and_model_with_bpmn(
client,
with_super_admin_user,
"test_group",
"timer_intermediate_catch_event",
process_model = load_test_spec(
process_model_id="test_group/timer_intermediate_catch_event",
process_model_source_directory="timer_intermediate_catch_event",
)
process_model = ProcessModelService.get_process_model(process_model_id=process_model_identifier)
process_instance = self.create_process_instance_from_process_model(process_model)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)

View File

@ -18,6 +18,7 @@ export default function ActiveUsers() {
HttpService.makeCallToBackend({
path: `/active-users/updates/${lastVisitedIdentifier}`,
successCallback: setActiveUsers,
httpMethod: 'POST',
});
};
@ -25,6 +26,7 @@ export default function ActiveUsers() {
HttpService.makeCallToBackend({
path: `/active-users/unregister/${lastVisitedIdentifier}`,
successCallback: setActiveUsers,
httpMethod: 'POST',
});
};
updateActiveUsers();

View File

@ -27,7 +27,7 @@ export default function ProcessModelNewExperimental() {
const handleFormSubmission = (event: any) => {
event.preventDefault();
HttpService.makeCallToBackend({
path: `/process-models-natural-language/${params.process_group_id}`,
path: `/process-model-natural-language/${params.process_group_id}`,
successCallback: navigateToProcessModel,
httpMethod: 'POST',
postBody: { natural_language_text: processModelDescriptiveText },