mirror of
https://github.com/status-im/spiff-arena.git
synced 2025-01-16 05:04:18 +00:00
moved callback code to delegate class in process model test runner w/ burnettk
This commit is contained in:
parent
f1b8349c6e
commit
89f3dbc7b8
@ -3,10 +3,10 @@ import json
|
|||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import traceback
|
import traceback
|
||||||
|
from abc import abstractmethod
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import Any
|
|
||||||
from typing import Callable
|
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
from typing import Type
|
||||||
from typing import Union
|
from typing import Union
|
||||||
|
|
||||||
from lxml import etree # type: ignore
|
from lxml import etree # type: ignore
|
||||||
@ -34,6 +34,14 @@ class MissingInputTaskData(Exception):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class UnsupporterRunnerDelegateGiven(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class BpmnFileMissingExecutableProcessError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class TestCaseErrorDetails:
|
class TestCaseErrorDetails:
|
||||||
error_messages: list[str]
|
error_messages: list[str]
|
||||||
@ -53,6 +61,124 @@ class TestCaseResult:
|
|||||||
test_case_error_details: Optional[TestCaseErrorDetails] = None
|
test_case_error_details: Optional[TestCaseErrorDetails] = None
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessModelTestRunnerDelegate:
|
||||||
|
"""Abstract class for the process model test runner delegate.
|
||||||
|
|
||||||
|
All delegates MUST inherit from this class.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
process_model_directory_path: str,
|
||||||
|
) -> None:
|
||||||
|
self.process_model_directory_path = process_model_directory_path
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def instantiate_executer(self, bpmn_file: str) -> BpmnWorkflow:
|
||||||
|
raise NotImplementedError("method instantiate_executer must be implemented")
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def execute_task(self, spiff_task: SpiffTask, task_data_for_submit: Optional[dict] = None) -> None:
|
||||||
|
raise NotImplementedError("method execute_task must be implemented")
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def get_next_task(self, bpmn_process_instance: BpmnWorkflow) -> Optional[SpiffTask]:
|
||||||
|
raise NotImplementedError("method get_next_task must be implemented")
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessModelTestRunnerMostlyPureSpiffDelegate(ProcessModelTestRunnerDelegate):
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
process_model_directory_path: str,
|
||||||
|
) -> None:
|
||||||
|
super().__init__(process_model_directory_path)
|
||||||
|
self.bpmn_processes_to_file_mappings: dict[str, str] = {}
|
||||||
|
self.bpmn_files_to_called_element_mappings: dict[str, list[str]] = {}
|
||||||
|
self._discover_process_model_processes()
|
||||||
|
|
||||||
|
def instantiate_executer(self, bpmn_file: str) -> BpmnWorkflow:
|
||||||
|
parser = MyCustomParser()
|
||||||
|
bpmn_file_etree = self._get_etree_from_bpmn_file(bpmn_file)
|
||||||
|
parser.add_bpmn_xml(bpmn_file_etree, filename=os.path.basename(bpmn_file))
|
||||||
|
all_related = self._find_related_bpmn_files(bpmn_file)
|
||||||
|
for related_file in all_related:
|
||||||
|
related_file_etree = self._get_etree_from_bpmn_file(related_file)
|
||||||
|
parser.add_bpmn_xml(related_file_etree, filename=os.path.basename(related_file))
|
||||||
|
sub_parsers = list(parser.process_parsers.values())
|
||||||
|
executable_process = None
|
||||||
|
for sub_parser in sub_parsers:
|
||||||
|
if sub_parser.process_executable:
|
||||||
|
executable_process = sub_parser.bpmn_id
|
||||||
|
if executable_process is None:
|
||||||
|
raise BpmnFileMissingExecutableProcessError(
|
||||||
|
f"Executable process cannot be found in {bpmn_file}. Test cannot run."
|
||||||
|
)
|
||||||
|
bpmn_process_spec = parser.get_spec(executable_process)
|
||||||
|
bpmn_process_instance = BpmnWorkflow(bpmn_process_spec)
|
||||||
|
return bpmn_process_instance
|
||||||
|
|
||||||
|
def execute_task(self, spiff_task: SpiffTask, task_data_for_submit: Optional[dict] = None) -> None:
|
||||||
|
if task_data_for_submit is not None or spiff_task.task_spec.manual:
|
||||||
|
if task_data_for_submit is not None:
|
||||||
|
spiff_task.update_data(task_data_for_submit)
|
||||||
|
spiff_task.complete()
|
||||||
|
else:
|
||||||
|
spiff_task.run()
|
||||||
|
|
||||||
|
def get_next_task(self, bpmn_process_instance: BpmnWorkflow) -> Optional[SpiffTask]:
|
||||||
|
ready_tasks = list([t for t in bpmn_process_instance.get_tasks(TaskState.READY)])
|
||||||
|
if len(ready_tasks) > 0:
|
||||||
|
return ready_tasks[0]
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _get_etree_from_bpmn_file(self, bpmn_file: str) -> etree._Element:
|
||||||
|
data = None
|
||||||
|
with open(bpmn_file, "rb") as f_handle:
|
||||||
|
data = f_handle.read()
|
||||||
|
etree_xml_parser = etree.XMLParser(resolve_entities=False)
|
||||||
|
return etree.fromstring(data, parser=etree_xml_parser)
|
||||||
|
|
||||||
|
def _find_related_bpmn_files(self, bpmn_file: str) -> list[str]:
|
||||||
|
related_bpmn_files = []
|
||||||
|
if bpmn_file in self.bpmn_files_to_called_element_mappings:
|
||||||
|
for bpmn_process_identifier in self.bpmn_files_to_called_element_mappings[bpmn_file]:
|
||||||
|
if bpmn_process_identifier in self.bpmn_processes_to_file_mappings:
|
||||||
|
new_file = self.bpmn_processes_to_file_mappings[bpmn_process_identifier]
|
||||||
|
related_bpmn_files.append(new_file)
|
||||||
|
related_bpmn_files.extend(self._find_related_bpmn_files(new_file))
|
||||||
|
return related_bpmn_files
|
||||||
|
|
||||||
|
def _discover_process_model_processes(
|
||||||
|
self,
|
||||||
|
) -> None:
|
||||||
|
process_model_bpmn_file_glob = os.path.join(self.process_model_directory_path, "**", "*.bpmn")
|
||||||
|
|
||||||
|
for file in glob.glob(process_model_bpmn_file_glob, recursive=True):
|
||||||
|
file_norm = os.path.normpath(file)
|
||||||
|
if file_norm not in self.bpmn_files_to_called_element_mappings:
|
||||||
|
self.bpmn_files_to_called_element_mappings[file_norm] = []
|
||||||
|
with open(file_norm, "rb") as f:
|
||||||
|
file_contents = f.read()
|
||||||
|
etree_xml_parser = etree.XMLParser(resolve_entities=False)
|
||||||
|
|
||||||
|
# if we cannot load process model then ignore it since it can cause errors unrelated
|
||||||
|
# to the test and if it is related, it will most likely be caught further along the test
|
||||||
|
try:
|
||||||
|
root = etree.fromstring(file_contents, parser=etree_xml_parser)
|
||||||
|
except etree.XMLSyntaxError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
call_activities = root.findall(".//bpmn:callActivity", namespaces=DEFAULT_NSMAP)
|
||||||
|
for call_activity in call_activities:
|
||||||
|
if "calledElement" in call_activity.attrib:
|
||||||
|
called_element = call_activity.attrib["calledElement"]
|
||||||
|
self.bpmn_files_to_called_element_mappings[file_norm].append(called_element)
|
||||||
|
bpmn_process_element = root.find('.//bpmn:process[@isExecutable="true"]', namespaces=DEFAULT_NSMAP)
|
||||||
|
if bpmn_process_element is not None:
|
||||||
|
bpmn_process_identifier = bpmn_process_element.attrib["id"]
|
||||||
|
self.bpmn_processes_to_file_mappings[bpmn_process_identifier] = file_norm
|
||||||
|
|
||||||
|
|
||||||
DEFAULT_NSMAP = {
|
DEFAULT_NSMAP = {
|
||||||
"bpmn": "http://www.omg.org/spec/BPMN/20100524/MODEL",
|
"bpmn": "http://www.omg.org/spec/BPMN/20100524/MODEL",
|
||||||
"bpmndi": "http://www.omg.org/spec/BPMN/20100524/DI",
|
"bpmndi": "http://www.omg.org/spec/BPMN/20100524/DI",
|
||||||
@ -93,18 +219,16 @@ JSON file format:
|
|||||||
|
|
||||||
|
|
||||||
class ProcessModelTestRunner:
|
class ProcessModelTestRunner:
|
||||||
"""Generic test runner code. May move into own library at some point.
|
"""Runs the test case json files for a given process model directory.
|
||||||
|
|
||||||
KEEP THIS GENERIC. do not add backend specific code here.
|
It searches for test case files recursively and will run all it finds by default.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
process_model_directory_path: str,
|
process_model_directory_path: str,
|
||||||
|
process_model_test_runner_delegate_class: Type = ProcessModelTestRunnerMostlyPureSpiffDelegate,
|
||||||
process_model_directory_for_test_discovery: Optional[str] = None,
|
process_model_directory_for_test_discovery: Optional[str] = None,
|
||||||
instantiate_executer_callback: Optional[Callable[[str], Any]] = None,
|
|
||||||
execute_task_callback: Optional[Callable[[Any, Optional[str], Optional[dict]], Any]] = None,
|
|
||||||
get_next_task_callback: Optional[Callable[[Any], Any]] = None,
|
|
||||||
test_case_file: Optional[str] = None,
|
test_case_file: Optional[str] = None,
|
||||||
test_case_identifier: Optional[str] = None,
|
test_case_identifier: Optional[str] = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
@ -112,21 +236,24 @@ class ProcessModelTestRunner:
|
|||||||
self.process_model_directory_for_test_discovery = (
|
self.process_model_directory_for_test_discovery = (
|
||||||
process_model_directory_for_test_discovery or process_model_directory_path
|
process_model_directory_for_test_discovery or process_model_directory_path
|
||||||
)
|
)
|
||||||
self.instantiate_executer_callback = instantiate_executer_callback
|
|
||||||
self.execute_task_callback = execute_task_callback
|
|
||||||
self.get_next_task_callback = get_next_task_callback
|
|
||||||
self.test_case_file = test_case_file
|
self.test_case_file = test_case_file
|
||||||
self.test_case_identifier = test_case_identifier
|
self.test_case_identifier = test_case_identifier
|
||||||
|
|
||||||
# keep track of the current task data index
|
if not issubclass(process_model_test_runner_delegate_class, ProcessModelTestRunnerDelegate):
|
||||||
self.task_data_index: dict[str, int] = {}
|
raise UnsupporterRunnerDelegateGiven(
|
||||||
|
"Process model test runner delegate must inherit from ProcessModelTestRunnerDelegate. Given"
|
||||||
|
f" class '{process_model_test_runner_delegate_class}' does not"
|
||||||
|
)
|
||||||
|
|
||||||
self.test_case_results: list[TestCaseResult] = []
|
self.process_model_test_runner_delegate = process_model_test_runner_delegate_class(
|
||||||
self.bpmn_processes_to_file_mappings: dict[str, str] = {}
|
process_model_directory_path
|
||||||
self.bpmn_files_to_called_element_mappings: dict[str, list[str]] = {}
|
)
|
||||||
|
|
||||||
self.test_mappings = self._discover_process_model_test_cases()
|
self.test_mappings = self._discover_process_model_test_cases()
|
||||||
self._discover_process_model_processes()
|
self.test_case_results: list[TestCaseResult] = []
|
||||||
|
|
||||||
|
# keep track of the current task data index
|
||||||
|
self.task_data_index: dict[str, int] = {}
|
||||||
|
|
||||||
def all_test_cases_passed(self) -> bool:
|
def all_test_cases_passed(self) -> bool:
|
||||||
failed_tests = self.failing_tests()
|
failed_tests = self.failing_tests()
|
||||||
@ -178,7 +305,9 @@ class ProcessModelTestRunner:
|
|||||||
test_case_task_properties = test_case_contents["tasks"][test_case_task_key]
|
test_case_task_properties = test_case_contents["tasks"][test_case_task_key]
|
||||||
|
|
||||||
task_type = next_task.task_spec.__class__.__name__
|
task_type = next_task.task_spec.__class__.__name__
|
||||||
if task_type in ["ServiceTask", "UserTask", "CallActivity"] and test_case_task_properties is None:
|
if task_type in ["ServiceTask", "UserTask", "CallActivity"] and (
|
||||||
|
test_case_task_properties is None or "data" not in test_case_task_properties
|
||||||
|
):
|
||||||
raise UnrunnableTestCaseError(
|
raise UnrunnableTestCaseError(
|
||||||
f"Cannot run test case '{test_case_identifier}'. It requires task data for"
|
f"Cannot run test case '{test_case_identifier}'. It requires task data for"
|
||||||
f" {next_task.task_spec.bpmn_id} because it is of type '{task_type}'"
|
f" {next_task.task_spec.bpmn_id} because it is of type '{task_type}'"
|
||||||
@ -207,85 +336,10 @@ class ProcessModelTestRunner:
|
|||||||
]
|
]
|
||||||
self._add_test_result(error_message is None, bpmn_file, test_case_identifier, error_message)
|
self._add_test_result(error_message is None, bpmn_file, test_case_identifier, error_message)
|
||||||
|
|
||||||
def _discover_process_model_test_cases(
|
|
||||||
self,
|
|
||||||
) -> dict[str, str]:
|
|
||||||
test_mappings = {}
|
|
||||||
|
|
||||||
json_test_file_glob = os.path.join(self.process_model_directory_for_test_discovery, "**", "test_*.json")
|
|
||||||
|
|
||||||
for file in glob.glob(json_test_file_glob, recursive=True):
|
|
||||||
file_norm = os.path.normpath(file)
|
|
||||||
file_dir = os.path.dirname(file_norm)
|
|
||||||
json_file_name = os.path.basename(file_norm)
|
|
||||||
if self.test_case_file is None or json_file_name == self.test_case_file:
|
|
||||||
bpmn_file_name = re.sub(r"^test_(.*)\.json", r"\1.bpmn", json_file_name)
|
|
||||||
bpmn_file_path = os.path.join(file_dir, bpmn_file_name)
|
|
||||||
if os.path.isfile(bpmn_file_path):
|
|
||||||
test_mappings[file_norm] = bpmn_file_path
|
|
||||||
else:
|
|
||||||
raise MissingBpmnFileForTestCaseError(
|
|
||||||
f"Cannot find a matching bpmn file for test case json file: '{file_norm}'"
|
|
||||||
)
|
|
||||||
return test_mappings
|
|
||||||
|
|
||||||
def _discover_process_model_processes(
|
|
||||||
self,
|
|
||||||
) -> None:
|
|
||||||
process_model_bpmn_file_glob = os.path.join(self.process_model_directory_path, "**", "*.bpmn")
|
|
||||||
|
|
||||||
for file in glob.glob(process_model_bpmn_file_glob, recursive=True):
|
|
||||||
file_norm = os.path.normpath(file)
|
|
||||||
if file_norm not in self.bpmn_files_to_called_element_mappings:
|
|
||||||
self.bpmn_files_to_called_element_mappings[file_norm] = []
|
|
||||||
with open(file_norm, "rb") as f:
|
|
||||||
file_contents = f.read()
|
|
||||||
etree_xml_parser = etree.XMLParser(resolve_entities=False)
|
|
||||||
|
|
||||||
# if we cannot load process model then ignore it since it can cause errors unrelated
|
|
||||||
# to the test and if it is related, it will most likely be caught further along the test
|
|
||||||
try:
|
|
||||||
root = etree.fromstring(file_contents, parser=etree_xml_parser)
|
|
||||||
except etree.XMLSyntaxError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
call_activities = root.findall(".//bpmn:callActivity", namespaces=DEFAULT_NSMAP)
|
|
||||||
for call_activity in call_activities:
|
|
||||||
if "calledElement" in call_activity.attrib:
|
|
||||||
called_element = call_activity.attrib["calledElement"]
|
|
||||||
self.bpmn_files_to_called_element_mappings[file_norm].append(called_element)
|
|
||||||
bpmn_process_element = root.find('.//bpmn:process[@isExecutable="true"]', namespaces=DEFAULT_NSMAP)
|
|
||||||
if bpmn_process_element is not None:
|
|
||||||
bpmn_process_identifier = bpmn_process_element.attrib["id"]
|
|
||||||
self.bpmn_processes_to_file_mappings[bpmn_process_identifier] = file_norm
|
|
||||||
|
|
||||||
def _execute_task(
|
def _execute_task(
|
||||||
self, spiff_task: SpiffTask, test_case_task_key: Optional[str], test_case_task_properties: Optional[dict]
|
self, spiff_task: SpiffTask, test_case_task_key: Optional[str], test_case_task_properties: Optional[dict]
|
||||||
) -> None:
|
) -> None:
|
||||||
if self.execute_task_callback:
|
task_data_for_submit = None
|
||||||
self.execute_task_callback(spiff_task, test_case_task_key, test_case_task_properties)
|
|
||||||
self._default_execute_task(spiff_task, test_case_task_key, test_case_task_properties)
|
|
||||||
|
|
||||||
def _get_next_task(self, bpmn_process_instance: BpmnWorkflow) -> Optional[SpiffTask]:
|
|
||||||
if self.get_next_task_callback:
|
|
||||||
return self.get_next_task_callback(bpmn_process_instance)
|
|
||||||
return self._default_get_next_task(bpmn_process_instance)
|
|
||||||
|
|
||||||
def _instantiate_executer(self, bpmn_file: str) -> BpmnWorkflow:
|
|
||||||
if self.instantiate_executer_callback:
|
|
||||||
return self.instantiate_executer_callback(bpmn_file)
|
|
||||||
return self._default_instantiate_executer(bpmn_file)
|
|
||||||
|
|
||||||
def _default_get_next_task(self, bpmn_process_instance: BpmnWorkflow) -> Optional[SpiffTask]:
|
|
||||||
ready_tasks = list([t for t in bpmn_process_instance.get_tasks(TaskState.READY)])
|
|
||||||
if len(ready_tasks) > 0:
|
|
||||||
return ready_tasks[0]
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _default_execute_task(
|
|
||||||
self, spiff_task: SpiffTask, test_case_task_key: Optional[str], test_case_task_properties: Optional[dict]
|
|
||||||
) -> None:
|
|
||||||
if spiff_task.task_spec.manual or spiff_task.task_spec.__class__.__name__ == "ServiceTask":
|
|
||||||
if test_case_task_key and test_case_task_properties and "data" in test_case_task_properties:
|
if test_case_task_key and test_case_task_properties and "data" in test_case_task_properties:
|
||||||
if test_case_task_key not in self.task_data_index:
|
if test_case_task_key not in self.task_data_index:
|
||||||
self.task_data_index[test_case_task_key] = 0
|
self.task_data_index[test_case_task_key] = 0
|
||||||
@ -296,49 +350,15 @@ class ProcessModelTestRunner:
|
|||||||
f"Missing input task data for task: {test_case_task_key}. "
|
f"Missing input task data for task: {test_case_task_key}. "
|
||||||
f"Only {task_data_length} given in the json but task was called {test_case_index + 1} times"
|
f"Only {task_data_length} given in the json but task was called {test_case_index + 1} times"
|
||||||
)
|
)
|
||||||
spiff_task.update_data(test_case_task_properties["data"][test_case_index])
|
task_data_for_submit = test_case_task_properties["data"][test_case_index]
|
||||||
self.task_data_index[test_case_task_key] += 1
|
self.task_data_index[test_case_task_key] += 1
|
||||||
spiff_task.complete()
|
self.process_model_test_runner_delegate.execute_task(spiff_task, task_data_for_submit)
|
||||||
else:
|
|
||||||
spiff_task.run()
|
|
||||||
|
|
||||||
def _find_related_bpmn_files(self, bpmn_file: str) -> list[str]:
|
def _get_next_task(self, bpmn_process_instance: BpmnWorkflow) -> Optional[SpiffTask]:
|
||||||
related_bpmn_files = []
|
return self.process_model_test_runner_delegate.get_next_task(bpmn_process_instance)
|
||||||
if bpmn_file in self.bpmn_files_to_called_element_mappings:
|
|
||||||
for bpmn_process_identifier in self.bpmn_files_to_called_element_mappings[bpmn_file]:
|
|
||||||
if bpmn_process_identifier in self.bpmn_processes_to_file_mappings:
|
|
||||||
new_file = self.bpmn_processes_to_file_mappings[bpmn_process_identifier]
|
|
||||||
related_bpmn_files.append(new_file)
|
|
||||||
related_bpmn_files.extend(self._find_related_bpmn_files(new_file))
|
|
||||||
return related_bpmn_files
|
|
||||||
|
|
||||||
def _get_etree_from_bpmn_file(self, bpmn_file: str) -> etree._Element:
|
def _instantiate_executer(self, bpmn_file: str) -> BpmnWorkflow:
|
||||||
data = None
|
return self.process_model_test_runner_delegate.instantiate_executer(bpmn_file)
|
||||||
with open(bpmn_file, "rb") as f_handle:
|
|
||||||
data = f_handle.read()
|
|
||||||
etree_xml_parser = etree.XMLParser(resolve_entities=False)
|
|
||||||
return etree.fromstring(data, parser=etree_xml_parser)
|
|
||||||
|
|
||||||
def _default_instantiate_executer(self, bpmn_file: str) -> BpmnWorkflow:
|
|
||||||
parser = MyCustomParser()
|
|
||||||
bpmn_file_etree = self._get_etree_from_bpmn_file(bpmn_file)
|
|
||||||
parser.add_bpmn_xml(bpmn_file_etree, filename=os.path.basename(bpmn_file))
|
|
||||||
all_related = self._find_related_bpmn_files(bpmn_file)
|
|
||||||
for related_file in all_related:
|
|
||||||
related_file_etree = self._get_etree_from_bpmn_file(related_file)
|
|
||||||
parser.add_bpmn_xml(related_file_etree, filename=os.path.basename(related_file))
|
|
||||||
sub_parsers = list(parser.process_parsers.values())
|
|
||||||
executable_process = None
|
|
||||||
for sub_parser in sub_parsers:
|
|
||||||
if sub_parser.process_executable:
|
|
||||||
executable_process = sub_parser.bpmn_id
|
|
||||||
if executable_process is None:
|
|
||||||
raise BpmnFileMissingExecutableProcessError(
|
|
||||||
f"Executable process cannot be found in {bpmn_file}. Test cannot run."
|
|
||||||
)
|
|
||||||
bpmn_process_spec = parser.get_spec(executable_process)
|
|
||||||
bpmn_process_instance = BpmnWorkflow(bpmn_process_spec)
|
|
||||||
return bpmn_process_instance
|
|
||||||
|
|
||||||
def _get_relative_path_of_bpmn_file(self, bpmn_file: str) -> str:
|
def _get_relative_path_of_bpmn_file(self, bpmn_file: str) -> str:
|
||||||
return os.path.relpath(bpmn_file, start=self.process_model_directory_path)
|
return os.path.relpath(bpmn_file, start=self.process_model_directory_path)
|
||||||
@ -382,8 +402,29 @@ class ProcessModelTestRunner:
|
|||||||
)
|
)
|
||||||
self.test_case_results.append(test_result)
|
self.test_case_results.append(test_result)
|
||||||
|
|
||||||
|
def _discover_process_model_test_cases(
|
||||||
|
self,
|
||||||
|
) -> dict[str, str]:
|
||||||
|
test_mappings = {}
|
||||||
|
json_test_file_glob = os.path.join(self.process_model_directory_for_test_discovery, "**", "test_*.json")
|
||||||
|
|
||||||
class BpmnFileMissingExecutableProcessError(Exception):
|
for file in glob.glob(json_test_file_glob, recursive=True):
|
||||||
|
file_norm = os.path.normpath(file)
|
||||||
|
file_dir = os.path.dirname(file_norm)
|
||||||
|
json_file_name = os.path.basename(file_norm)
|
||||||
|
if self.test_case_file is None or json_file_name == self.test_case_file:
|
||||||
|
bpmn_file_name = re.sub(r"^test_(.*)\.json", r"\1.bpmn", json_file_name)
|
||||||
|
bpmn_file_path = os.path.join(file_dir, bpmn_file_name)
|
||||||
|
if os.path.isfile(bpmn_file_path):
|
||||||
|
test_mappings[file_norm] = bpmn_file_path
|
||||||
|
else:
|
||||||
|
raise MissingBpmnFileForTestCaseError(
|
||||||
|
f"Cannot find a matching bpmn file for test case json file: '{file_norm}'"
|
||||||
|
)
|
||||||
|
return test_mappings
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessModeltTestRunnerBackendDelegate(ProcessModelTestRunnerMostlyPureSpiffDelegate):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
@ -398,9 +439,7 @@ class ProcessModelTestRunnerService:
|
|||||||
process_model_directory_path,
|
process_model_directory_path,
|
||||||
test_case_file=test_case_file,
|
test_case_file=test_case_file,
|
||||||
test_case_identifier=test_case_identifier,
|
test_case_identifier=test_case_identifier,
|
||||||
# instantiate_executer_callback=self._instantiate_executer_callback,
|
process_model_test_runner_delegate_class=ProcessModeltTestRunnerBackendDelegate,
|
||||||
# execute_task_callback=self._execute_task_callback,
|
|
||||||
# get_next_task_callback=self._get_next_task_callback,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def run(self) -> None:
|
def run(self) -> None:
|
||||||
|
@ -8,6 +8,7 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
|||||||
|
|
||||||
from spiffworkflow_backend.services.process_model_test_runner_service import NoTestCasesFoundError
|
from spiffworkflow_backend.services.process_model_test_runner_service import NoTestCasesFoundError
|
||||||
from spiffworkflow_backend.services.process_model_test_runner_service import ProcessModelTestRunner
|
from spiffworkflow_backend.services.process_model_test_runner_service import ProcessModelTestRunner
|
||||||
|
from spiffworkflow_backend.services.process_model_test_runner_service import UnsupporterRunnerDelegateGiven
|
||||||
|
|
||||||
|
|
||||||
class TestProcessModelTestRunner(BaseTest):
|
class TestProcessModelTestRunner(BaseTest):
|
||||||
@ -29,6 +30,16 @@ class TestProcessModelTestRunner(BaseTest):
|
|||||||
process_model_test_runner.run()
|
process_model_test_runner.run()
|
||||||
assert process_model_test_runner.all_test_cases_passed(), process_model_test_runner.test_case_results
|
assert process_model_test_runner.all_test_cases_passed(), process_model_test_runner.test_case_results
|
||||||
|
|
||||||
|
def test_will_raise_if_bad_delegate_is_given(
|
||||||
|
self,
|
||||||
|
app: Flask,
|
||||||
|
with_db_and_bpmn_file_cleanup: None,
|
||||||
|
) -> None:
|
||||||
|
with pytest.raises(UnsupporterRunnerDelegateGiven):
|
||||||
|
ProcessModelTestRunner(
|
||||||
|
os.path.join(self.root_path(), "DNE"), process_model_test_runner_delegate_class=NoTestCasesFoundError
|
||||||
|
)
|
||||||
|
|
||||||
def test_can_test_multiple_process_models_with_all_passing_tests(
|
def test_can_test_multiple_process_models_with_all_passing_tests(
|
||||||
self,
|
self,
|
||||||
app: Flask,
|
app: Flask,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user