cleaned up process model tests and added support for service tasks w/ burnettk

This commit is contained in:
jasquat 2023-05-17 17:28:51 -04:00
parent acaf3a3c24
commit 40c67f000c
18 changed files with 186 additions and 109 deletions

View File

@ -36,6 +36,10 @@ class MissingBpmnFileForTestCaseError(Exception):
pass pass
class NoTestCasesFoundError(Exception):
pass
@dataclass @dataclass
class TestCaseResult: class TestCaseResult:
passed: bool passed: bool
@ -95,6 +99,8 @@ class ProcessModelTestRunner:
return len(failed_tests) < 1 return len(failed_tests) < 1
def run(self) -> None: def run(self) -> None:
if len(self.test_mappings.items()) < 1:
raise NoTestCasesFoundError(f"Could not find any test cases in given directory: {self.process_model_directory_for_test_discovery}")
for json_test_case_file, bpmn_file in self.test_mappings.items(): for json_test_case_file, bpmn_file in self.test_mappings.items():
with open(json_test_case_file) as f: with open(json_test_case_file) as f:
json_file_contents = json.loads(f.read()) json_file_contents = json.loads(f.read())
@ -189,21 +195,14 @@ class ProcessModelTestRunner:
return self.instantiate_executer_callback(bpmn_file) return self.instantiate_executer_callback(bpmn_file)
return self._default_instantiate_executer(bpmn_file) return self._default_instantiate_executer(bpmn_file)
def _get_ready_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]:
tasks = list([t for t in bpmn_process_instance.get_tasks(TaskState.READY) if not t.task_spec.manual])
if len(tasks) > 0:
tasks = [tasks[0]]
return tasks
def _default_get_next_task(self, bpmn_process_instance: BpmnWorkflow) -> Optional[SpiffTask]: def _default_get_next_task(self, bpmn_process_instance: BpmnWorkflow) -> Optional[SpiffTask]:
engine_steps = self._get_ready_engine_steps(bpmn_process_instance) ready_tasks = list([t for t in bpmn_process_instance.get_tasks(TaskState.READY)])
if len(engine_steps) > 0: if len(ready_tasks) > 0:
return engine_steps[0] return ready_tasks[0]
return None return None
def _default_execute_task(self, spiff_task: SpiffTask, test_case_task_properties: Optional[dict]) -> None: def _default_execute_task(self, spiff_task: SpiffTask, test_case_task_properties: Optional[dict]) -> None:
if spiff_task.task_spec.manual: if spiff_task.task_spec.manual or spiff_task.task_spec.__class__.__name__ == 'ServiceTask':
if test_case_task_properties and 'data' in test_case_task_properties: if test_case_task_properties and 'data' in test_case_task_properties:
spiff_task.update_data(test_case_task_properties['data']) spiff_task.update_data(test_case_task_properties['data'])
spiff_task.complete() spiff_task.complete()
@ -274,40 +273,3 @@ class ProcessModelTestRunnerService:
def run(self) -> None: def run(self) -> None:
self.process_model_test_runner.run() self.process_model_test_runner.run()
def _execute_task_callback(self, spiff_task: SpiffTask, _test_case_json: Optional[dict]) -> None:
spiff_task.run()
def _get_next_task_callback(self, bpmn_process_instance: BpmnWorkflow) -> Optional[SpiffTask]:
engine_steps = self._get_ready_engine_steps(bpmn_process_instance)
if len(engine_steps) > 0:
return engine_steps[0]
return None
def _get_ready_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]:
tasks = list([t for t in bpmn_process_instance.get_tasks(TaskState.READY) if not t.task_spec.manual])
if len(tasks) > 0:
tasks = [tasks[0]]
return tasks
def _instantiate_executer_callback(self, bpmn_file: str) -> BpmnWorkflow:
parser = MyCustomParser()
data = None
with open(bpmn_file, "rb") as f_handle:
data = f_handle.read()
etree_xml_parser = etree.XMLParser(resolve_entities=False)
bpmn = etree.fromstring(data, parser=etree_xml_parser)
parser.add_bpmn_xml(bpmn, filename=os.path.basename(bpmn_file))
sub_parsers = list(parser.process_parsers.values())
executable_process = None
for sub_parser in sub_parsers:
if sub_parser.process_executable:
executable_process = sub_parser.bpmn_id
if executable_process is None:
raise BpmnFileMissingExecutableProcessError(
f"Executable process cannot be found in {bpmn_file}. Test cannot run."
)
bpmn_process_spec = parser.get_spec(executable_process)
bpmn_process_instance = BpmnWorkflow(bpmn_process_spec)
return bpmn_process_instance

View File

@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:spiffworkflow="http://spiffworkflow.org/bpmn/schema/1.0/core" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:process id="BasicServiceTaskProcess" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_19ephzh</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_19ephzh" sourceRef="StartEvent_1" targetRef="service_task_one" />
<bpmn:endEvent id="Event_132m0z7">
<bpmn:incoming>Flow_1dsxn78</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_1dsxn78" sourceRef="service_task_one" targetRef="Event_132m0z7" />
<bpmn:serviceTask id="service_task_one" name="Task 2">
<bpmn:extensionElements>
<spiffworkflow:serviceTaskOperator id="http/GetRequest" resultVariable="bamboo_get_employee">
<spiffworkflow:parameters>
<spiffworkflow:parameter id="basic_auth_password" type="str" value="&#34;x&#34;" />
<spiffworkflow:parameter id="basic_auth_username" type="str" value="&#34;secret:BAMBOOHR_API_KEY&#34;" />
<spiffworkflow:parameter id="headers" type="any" value="{&#34;Accept&#34;: &#34;application/json&#34;}" />
<spiffworkflow:parameter id="params" type="any" value="{&#34;fields&#34;: &#34;firstName,lastName&#34;}" />
<spiffworkflow:parameter id="url" type="str" value="f&#34;https://api.bamboohr.com/api/gateway.php/statusresearchdemo/v1/employees/113&#34;" />
</spiffworkflow:parameters>
</spiffworkflow:serviceTaskOperator>
<spiffworkflow:instructionsForEndUser>This is the Service Task Unit Test Screen.</spiffworkflow:instructionsForEndUser>
<spiffworkflow:postScript />
</bpmn:extensionElements>
<bpmn:incoming>Flow_0xx2kop</bpmn:incoming>
<bpmn:outgoing>Flow_1dsxn78</bpmn:outgoing>
</bpmn:serviceTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="BasicServiceTaskProcess">
<bpmndi:BPMNEdge id="Flow_19ephzh_di" bpmnElement="Flow_19ephzh">
<di:waypoint x="215" y="177" />
<di:waypoint x="270" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0xx2kop_di" bpmnElement="Flow_0xx2kop">
<di:waypoint x="370" y="177" />
<di:waypoint x="430" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1dsxn78_di" bpmnElement="Flow_1dsxn78">
<di:waypoint x="530" y="177" />
<di:waypoint x="592" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_132m0z7_di" bpmnElement="Event_132m0z7">
<dc:Bounds x="592" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1nlg9cc_di" bpmnElement="service_task_one">
<dc:Bounds x="430" y="137" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>

View File

@ -0,0 +1,10 @@
{
"description": "A.1.0.2",
"display_name": "A.1.0.2 - Service Task",
"display_order": 13,
"exception_notification_addresses": [],
"fault_or_suspend_on_exception": "fault",
"files": [],
"primary_file_name": "A.1.0.2.bpmn",
"primary_process_id": "Process_test_a102_A_1_0_2_bd2e724"
}

View File

@ -0,0 +1,10 @@
{
"test_case_one": {
"tasks": {
"service_task_one": {
"data": { "the_result": "result_from_service" }
}
},
"expected_output_json": { "the_result": "result_from_service" }
}
}

View File

@ -0,0 +1,100 @@
import os
from typing import Any
from typing import Optional
import pytest
from flask import current_app
from flask import Flask
from pytest_mock import MockerFixture
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.process_model_test_runner_service import NoTestCasesFoundError, ProcessModelTestRunner
class TestProcessModelTestRunner(BaseTest):
def test_can_test_a_simple_process_model(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_mocked_root_path: Any,
) -> None:
process_model_test_runner = self._run_model_tests('basic_script_task')
assert len(process_model_test_runner.test_case_results) == 1
def test_will_raise_if_no_tests_found(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_mocked_root_path: Any,
) -> None:
process_model_test_runner = ProcessModelTestRunner(
os.path.join(FileSystemService.root_path(), "DNE")
)
with pytest.raises(NoTestCasesFoundError):
process_model_test_runner.run()
assert process_model_test_runner.all_test_cases_passed(), process_model_test_runner.test_case_results
def test_can_test_multiple_process_models_with_all_passing_tests(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_mocked_root_path: Any,
) -> None:
process_model_test_runner = self._run_model_tests()
assert len(process_model_test_runner.test_case_results) > 1
def test_can_test_multiple_process_models_with_failing_tests(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_mocked_root_path: Any,
) -> None:
process_model_test_runner = self._run_model_tests(parent_directory='failing_tests')
assert len(process_model_test_runner.test_case_results) == 1
def test_can_test_process_model_call_activity(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_mocked_root_path: Any,
) -> None:
process_model_test_runner = self._run_model_tests(bpmn_process_directory_name='basic_call_activity')
assert len(process_model_test_runner.test_case_results) == 1
def test_can_test_process_model_with_service_task(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_mocked_root_path: Any,
) -> None:
process_model_test_runner = self._run_model_tests(bpmn_process_directory_name='basic_service_task')
assert len(process_model_test_runner.test_case_results) == 1
def _run_model_tests(self, bpmn_process_directory_name: Optional[str] = None, parent_directory: str = 'passing_tests') -> ProcessModelTestRunner:
base_process_model_dir_path_segments = [FileSystemService.root_path(), parent_directory]
path_segments = base_process_model_dir_path_segments
if bpmn_process_directory_name:
path_segments = path_segments + [bpmn_process_directory_name]
process_model_test_runner = ProcessModelTestRunner(
process_model_directory_path=os.path.join(*base_process_model_dir_path_segments),
process_model_directory_for_test_discovery=os.path.join(*path_segments)
)
process_model_test_runner.run()
all_tests_expected_to_pass = parent_directory == 'passing_tests'
assert process_model_test_runner.all_test_cases_passed() is all_tests_expected_to_pass, process_model_test_runner.test_case_results
return process_model_test_runner
@pytest.fixture()
def with_mocked_root_path(self, mocker: MockerFixture) -> None:
path = os.path.join(
current_app.instance_path,
"..",
"..",
"tests",
"data",
"bpmn_unit_test_process_models",
)
mocker.patch.object(FileSystemService, attribute="root_path", return_value=path)

View File

@ -1,61 +0,0 @@
import os
from typing import Any
import pytest
from flask import current_app
from flask import Flask
from pytest_mock import MockerFixture
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.process_model_test_runner_service import ProcessModelTestRunner, ProcessModelTestRunnerService
class TestProcessModelTestRunnerService(BaseTest):
def test_can_test_a_simple_process_model(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_mocked_root_path: Any,
) -> None:
test_runner_service = ProcessModelTestRunnerService(
os.path.join(FileSystemService.root_path(), "basic_script_task")
)
test_runner_service.run()
assert test_runner_service.process_model_test_runner.all_test_cases_passed(), test_runner_service.process_model_test_runner.test_case_results
def test_can_test_multiple_process_models(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_mocked_root_path: Any,
) -> None:
test_runner_service = ProcessModelTestRunnerService(FileSystemService.root_path())
test_runner_service.run()
assert test_runner_service.process_model_test_runner.all_test_cases_passed() is False
def test_can_test_process_model_call_activity(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
with_mocked_root_path: Any,
) -> None:
test_runner_service = ProcessModelTestRunner(
process_model_directory_path=FileSystemService.root_path(),
process_model_directory_for_test_discovery=os.path.join(FileSystemService.root_path(), "basic_call_activity")
)
test_runner_service.run()
assert test_runner_service.all_test_cases_passed() is True, test_runner_service.test_case_results
@pytest.fixture()
def with_mocked_root_path(self, mocker: MockerFixture) -> None:
path = os.path.join(
current_app.instance_path,
"..",
"..",
"tests",
"data",
"bpmn_unit_test_process_models",
)
mocker.patch.object(FileSystemService, attribute="root_path", return_value=path)