allow prepending test case data with process id and added better error formatting w/ burnettk

This commit is contained in:
jasquat 2023-05-18 15:11:30 -04:00
parent 40c67f000c
commit 0bd16283fc
6 changed files with 104 additions and 52 deletions

View File

@ -13,7 +13,6 @@ from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState
from spiffworkflow_backend.services.custom_parser import MyCustomParser
from spiffworkflow_backend.services.spec_file_service import SpecFileService
# workflow json for test case
@ -45,17 +44,24 @@ class TestCaseResult:
passed: bool
bpmn_file: str
test_case_name: str
error: Optional[str] = None
error_messages: Optional[list[str]] = None
DEFAULT_NSMAP = {
'bpmn': 'http://www.omg.org/spec/BPMN/20100524/MODEL',
'bpmndi': 'http://www.omg.org/spec/BPMN/20100524/DI',
'dc': 'http://www.omg.org/spec/DD/20100524/DC',
"bpmn": "http://www.omg.org/spec/BPMN/20100524/MODEL",
"bpmndi": "http://www.omg.org/spec/BPMN/20100524/DI",
"dc": "http://www.omg.org/spec/DD/20100524/DC",
}
# input:
# BPMN_TASK_IDENTIIFER:
# can be either task bpmn identifier or in format:
# [BPMN_PROCESS_ID]:[TASK_BPMN_IDENTIFIER]
# example: 'BasicServiceTaskProcess:service_task_one'
# this allows for tasks to share bpmn identifiers across models
# which is useful for call activities
#
# json_file:
# {
# [TEST_CASE_NAME]: {
@ -78,15 +84,20 @@ class ProcessModelTestRunner:
process_model_directory_path: str,
process_model_directory_for_test_discovery: Optional[str] = None,
instantiate_executer_callback: Optional[Callable[[str], Any]] = None,
execute_task_callback: Optional[Callable[[Any, Optional[dict]], Any]] = None,
execute_task_callback: Optional[Callable[[Any, str, Optional[dict]], Any]] = None,
get_next_task_callback: Optional[Callable[[Any], Any]] = None,
) -> None:
self.process_model_directory_path = process_model_directory_path
self.process_model_directory_for_test_discovery = process_model_directory_for_test_discovery or process_model_directory_path
self.process_model_directory_for_test_discovery = (
process_model_directory_for_test_discovery or process_model_directory_path
)
self.instantiate_executer_callback = instantiate_executer_callback
self.execute_task_callback = execute_task_callback
self.get_next_task_callback = get_next_task_callback
# keep track of the current task data index
self.task_data_index: dict[str, int] = {}
self.test_case_results: list[TestCaseResult] = []
self.bpmn_processes_to_file_mappings: dict[str, str] = {}
self.bpmn_files_to_called_element_mappings: dict[str, list[str]] = {}
@ -95,12 +106,28 @@ class ProcessModelTestRunner:
self._discover_process_model_processes()
def all_test_cases_passed(self) -> bool:
failed_tests = [t for t in self.test_case_results if t.passed is False]
failed_tests = self.failing_tests()
return len(failed_tests) < 1
def failing_tests(self) -> list[TestCaseResult]:
return [t for t in self.test_case_results if t.passed is False]
def failing_tests_formatted(self) -> str:
formatted_tests = ["FAILING TESTS:"]
for failing_test in self.failing_tests():
msg = ''
if failing_test.error_messages:
msg = '\n\t\t'.join(failing_test.error_messages)
formatted_tests.append(
f'\t{failing_test.bpmn_file}: {failing_test.test_case_name}: {msg}'
)
return '\n'.join(formatted_tests)
def run(self) -> None:
if len(self.test_mappings.items()) < 1:
raise NoTestCasesFoundError(f"Could not find any test cases in given directory: {self.process_model_directory_for_test_discovery}")
raise NoTestCasesFoundError(
f"Could not find any test cases in given directory: {self.process_model_directory_for_test_discovery}"
)
for json_test_case_file, bpmn_file in self.test_mappings.items():
with open(json_test_case_file) as f:
json_file_contents = json.loads(f.read())
@ -109,16 +136,21 @@ class ProcessModelTestRunner:
try:
self.run_test_case(bpmn_file, test_case_name, test_case_contents)
except Exception as ex:
self._add_test_result(False, bpmn_file, test_case_name, f"Syntax error: {str(ex)}")
ex_as_array = str(ex).split('\n')
self._add_test_result(False, bpmn_file, test_case_name, ex_as_array)
def run_test_case(self, bpmn_file: str, test_case_name: str, test_case_contents: dict) -> None:
bpmn_process_instance = self._instantiate_executer(bpmn_file)
next_task = self._get_next_task(bpmn_process_instance)
while next_task is not None:
test_case_task_properties = None
test_case_task_key = next_task.task_spec.bpmn_id
if "tasks" in test_case_contents:
if next_task.task_spec.bpmn_id in test_case_contents["tasks"]:
test_case_task_properties = test_case_contents["tasks"][next_task.task_spec.bpmn_id]
if test_case_task_key not in test_case_contents["tasks"]:
# we may need to go to the top level workflow of a given bpmn file
test_case_task_key = f"{next_task.workflow.spec.name}:{next_task.task_spec.bpmn_id}"
if test_case_task_key in test_case_contents["tasks"]:
test_case_task_properties = test_case_contents["tasks"][test_case_task_key]
task_type = next_task.task_spec.__class__.__name__
if task_type in ["ServiceTask", "UserTask", "CallActivity"] and test_case_task_properties is None:
@ -126,17 +158,29 @@ class ProcessModelTestRunner:
f"Cannot run test case '{test_case_name}'. It requires task data for"
f" {next_task.task_spec.bpmn_id} because it is of type '{task_type}'"
)
self._execute_task(next_task, test_case_task_properties)
self._execute_task(next_task, test_case_task_key, test_case_task_properties)
next_task = self._get_next_task(bpmn_process_instance)
test_passed = test_case_contents["expected_output_json"] == bpmn_process_instance.data
error_message = None
if test_passed is False:
error_message = (
f"Expected output did not match actual output:"
f"\nexpected: {test_case_contents['expected_output_json']}"
f"\nactual: {bpmn_process_instance.data}"
)
self._add_test_result(test_passed, bpmn_file, test_case_name, error_message)
if bpmn_process_instance.is_completed() is False:
error_message = [
"Expected process instance to complete but it did not.",
f"Final data was: {bpmn_process_instance.last_task.data}",
f"Last task bpmn id: {bpmn_process_instance.last_task.task_spec.bpmn_id}",
f"Last task type: {bpmn_process_instance.last_task.task_spec.__class__.__name__}",
]
elif bpmn_process_instance.success is False:
error_message = [
"Expected process instance to succeed but it did not.",
f"Final data was: {bpmn_process_instance.data}",
]
elif test_case_contents["expected_output_json"] != bpmn_process_instance.data:
error_message = [
"Expected output did not match actual output:",
f"expected: {test_case_contents['expected_output_json']}",
f"actual: {bpmn_process_instance.data}",
]
self._add_test_result(error_message is None, bpmn_file, test_case_name, error_message)
def _discover_process_model_test_cases(
self,
@ -168,22 +212,22 @@ class ProcessModelTestRunner:
file_norm = os.path.normpath(file)
if file_norm not in self.bpmn_files_to_called_element_mappings:
self.bpmn_files_to_called_element_mappings[file_norm] = []
with open(file_norm, 'rb') as f:
with open(file_norm, "rb") as f:
file_contents = f.read()
etree_xml_parser = etree.XMLParser(resolve_entities=False)
root = etree.fromstring(file_contents, parser=etree_xml_parser)
call_activities = root.findall('.//bpmn:callActivity', namespaces=DEFAULT_NSMAP)
call_activities = root.findall(".//bpmn:callActivity", namespaces=DEFAULT_NSMAP)
for call_activity in call_activities:
called_element = call_activity.attrib['calledElement']
called_element = call_activity.attrib["calledElement"]
self.bpmn_files_to_called_element_mappings[file_norm].append(called_element)
bpmn_process_element = root.find('.//bpmn:process[@isExecutable="true"]', namespaces=DEFAULT_NSMAP)
bpmn_process_identifier = bpmn_process_element.attrib['id']
bpmn_process_identifier = bpmn_process_element.attrib["id"]
self.bpmn_processes_to_file_mappings[bpmn_process_identifier] = file_norm
def _execute_task(self, spiff_task: SpiffTask, test_case_task_properties: Optional[dict]) -> None:
def _execute_task(self, spiff_task: SpiffTask, test_case_task_key: str, test_case_task_properties: Optional[dict]) -> None:
if self.execute_task_callback:
self.execute_task_callback(spiff_task, test_case_task_properties)
self._default_execute_task(spiff_task, test_case_task_properties)
self.execute_task_callback(spiff_task, test_case_task_key, test_case_task_properties)
self._default_execute_task(spiff_task, test_case_task_key, test_case_task_properties)
def _get_next_task(self, bpmn_process_instance: BpmnWorkflow) -> Optional[SpiffTask]:
if self.get_next_task_callback:
@ -201,10 +245,13 @@ class ProcessModelTestRunner:
return ready_tasks[0]
return None
def _default_execute_task(self, spiff_task: SpiffTask, test_case_task_properties: Optional[dict]) -> None:
if spiff_task.task_spec.manual or spiff_task.task_spec.__class__.__name__ == 'ServiceTask':
if test_case_task_properties and 'data' in test_case_task_properties:
spiff_task.update_data(test_case_task_properties['data'])
def _default_execute_task(self, spiff_task: SpiffTask, test_case_task_key: str, test_case_task_properties: Optional[dict]) -> None:
if spiff_task.task_spec.manual or spiff_task.task_spec.__class__.__name__ == "ServiceTask":
if test_case_task_properties and "data" in test_case_task_properties:
if test_case_task_key not in self.task_data_index:
self.task_data_index[test_case_task_key] = 0
spiff_task.update_data(test_case_task_properties["data"][self.task_data_index[test_case_task_key]])
self.task_data_index[test_case_task_key] += 1
spiff_task.complete()
else:
spiff_task.run()
@ -247,13 +294,16 @@ class ProcessModelTestRunner:
bpmn_process_instance = BpmnWorkflow(bpmn_process_spec)
return bpmn_process_instance
def _add_test_result(self, passed: bool, bpmn_file: str, test_case_name: str, error: Optional[str] = None) -> None:
bpmn_file_relative = os.path.relpath(bpmn_file, start=self.process_model_directory_path)
def _get_relative_path_of_bpmn_file(self, bpmn_file: str) -> str:
return os.path.relpath(bpmn_file, start=self.process_model_directory_path)
def _add_test_result(self, passed: bool, bpmn_file: str, test_case_name: str, error_messages: Optional[list[str]] = None) -> None:
bpmn_file_relative = self._get_relative_path_of_bpmn_file(bpmn_file)
test_result = TestCaseResult(
passed=passed,
bpmn_file=bpmn_file_relative,
test_case_name=test_case_name,
error=error,
error_messages=error_messages,
)
self.test_case_results.append(test_result)

View File

@ -10,7 +10,6 @@
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_1hzwssi" sourceRef="Activity_0irfg4l" targetRef="Event_0bz40ol" />
<bpmn:callActivity id="Activity_0irfg4l" name="Call Activity" calledElement="BasicManualTaskProcess">
<!-- <bpmn:callActivity id="Activity_0irfg4l" name="Call Activity" calledElement="BasicManualTaskProcess2"> -->
<bpmn:incoming>Flow_0ext5lt</bpmn:incoming>
<bpmn:outgoing>Flow_1hzwssi</bpmn:outgoing>
</bpmn:callActivity>

View File

@ -2,7 +2,7 @@
"test_case_one": {
"tasks": {
"manual_task_one": {
"data": {}
"data": [{}]
}
},
"expected_output_json": {}

View File

@ -7,4 +7,4 @@
"files": [],
"primary_file_name": "A.1.0.2.bpmn",
"primary_process_id": "Process_test_a102_A_1_0_2_bd2e724"
}
}

View File

@ -1,8 +1,8 @@
{
"test_case_one": {
"tasks": {
"service_task_one": {
"data": { "the_result": "result_from_service" }
"BasicServiceTaskProcess:service_task_one": {
"data": [{ "the_result": "result_from_service" }]
}
},
"expected_output_json": { "the_result": "result_from_service" }

View File

@ -10,7 +10,8 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.process_model_test_runner_service import NoTestCasesFoundError, ProcessModelTestRunner
from spiffworkflow_backend.services.process_model_test_runner_service import NoTestCasesFoundError
from spiffworkflow_backend.services.process_model_test_runner_service import ProcessModelTestRunner
class TestProcessModelTestRunner(BaseTest):
@ -20,7 +21,7 @@ class TestProcessModelTestRunner(BaseTest):
with_db_and_bpmn_file_cleanup: None,
with_mocked_root_path: Any,
) -> None:
process_model_test_runner = self._run_model_tests('basic_script_task')
process_model_test_runner = self._run_model_tests("basic_script_task")
assert len(process_model_test_runner.test_case_results) == 1
def test_will_raise_if_no_tests_found(
@ -29,9 +30,7 @@ class TestProcessModelTestRunner(BaseTest):
with_db_and_bpmn_file_cleanup: None,
with_mocked_root_path: Any,
) -> None:
process_model_test_runner = ProcessModelTestRunner(
os.path.join(FileSystemService.root_path(), "DNE")
)
process_model_test_runner = ProcessModelTestRunner(os.path.join(FileSystemService.root_path(), "DNE"))
with pytest.raises(NoTestCasesFoundError):
process_model_test_runner.run()
assert process_model_test_runner.all_test_cases_passed(), process_model_test_runner.test_case_results
@ -51,7 +50,7 @@ class TestProcessModelTestRunner(BaseTest):
with_db_and_bpmn_file_cleanup: None,
with_mocked_root_path: Any,
) -> None:
process_model_test_runner = self._run_model_tests(parent_directory='failing_tests')
process_model_test_runner = self._run_model_tests(parent_directory="failing_tests")
assert len(process_model_test_runner.test_case_results) == 1
def test_can_test_process_model_call_activity(
@ -60,7 +59,7 @@ class TestProcessModelTestRunner(BaseTest):
with_db_and_bpmn_file_cleanup: None,
with_mocked_root_path: Any,
) -> None:
process_model_test_runner = self._run_model_tests(bpmn_process_directory_name='basic_call_activity')
process_model_test_runner = self._run_model_tests(bpmn_process_directory_name="basic_call_activity")
assert len(process_model_test_runner.test_case_results) == 1
def test_can_test_process_model_with_service_task(
@ -69,22 +68,26 @@ class TestProcessModelTestRunner(BaseTest):
with_db_and_bpmn_file_cleanup: None,
with_mocked_root_path: Any,
) -> None:
process_model_test_runner = self._run_model_tests(bpmn_process_directory_name='basic_service_task')
process_model_test_runner = self._run_model_tests(bpmn_process_directory_name="basic_service_task")
assert len(process_model_test_runner.test_case_results) == 1
def _run_model_tests(self, bpmn_process_directory_name: Optional[str] = None, parent_directory: str = 'passing_tests') -> ProcessModelTestRunner:
def _run_model_tests(
self, bpmn_process_directory_name: Optional[str] = None, parent_directory: str = "passing_tests"
) -> ProcessModelTestRunner:
base_process_model_dir_path_segments = [FileSystemService.root_path(), parent_directory]
path_segments = base_process_model_dir_path_segments
if bpmn_process_directory_name:
path_segments = path_segments + [bpmn_process_directory_name]
process_model_test_runner = ProcessModelTestRunner(
process_model_directory_path=os.path.join(*base_process_model_dir_path_segments),
process_model_directory_for_test_discovery=os.path.join(*path_segments)
process_model_directory_for_test_discovery=os.path.join(*path_segments),
)
process_model_test_runner.run()
all_tests_expected_to_pass = parent_directory == 'passing_tests'
assert process_model_test_runner.all_test_cases_passed() is all_tests_expected_to_pass, process_model_test_runner.test_case_results
all_tests_expected_to_pass = parent_directory == "passing_tests"
assert (
process_model_test_runner.all_test_cases_passed() is all_tests_expected_to_pass
), process_model_test_runner.failing_tests_formatted()
return process_model_test_runner
@pytest.fixture()