diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_model_test_runner_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_model_test_runner_service.py index db5a0d44..70a73f4f 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_model_test_runner_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_model_test_runner_service.py @@ -3,10 +3,10 @@ import json import os import re import traceback +from abc import abstractmethod from dataclasses import dataclass -from typing import Any -from typing import Callable from typing import Optional +from typing import Type from typing import Union from lxml import etree # type: ignore @@ -34,6 +34,14 @@ class MissingInputTaskData(Exception): pass +class UnsupporterRunnerDelegateGiven(Exception): + pass + + +class BpmnFileMissingExecutableProcessError(Exception): + pass + + @dataclass class TestCaseErrorDetails: error_messages: list[str] @@ -53,6 +61,124 @@ class TestCaseResult: test_case_error_details: Optional[TestCaseErrorDetails] = None +class ProcessModelTestRunnerDelegate: + """Abstract class for the process model test runner delegate. + + All delegates MUST inherit from this class. + """ + + def __init__( + self, + process_model_directory_path: str, + ) -> None: + self.process_model_directory_path = process_model_directory_path + + @abstractmethod + def instantiate_executer(self, bpmn_file: str) -> BpmnWorkflow: + raise NotImplementedError("method instantiate_executer must be implemented") + + @abstractmethod + def execute_task(self, spiff_task: SpiffTask, task_data_for_submit: Optional[dict] = None) -> None: + raise NotImplementedError("method execute_task must be implemented") + + @abstractmethod + def get_next_task(self, bpmn_process_instance: BpmnWorkflow) -> Optional[SpiffTask]: + raise NotImplementedError("method get_next_task must be implemented") + + +class ProcessModelTestRunnerMostlyPureSpiffDelegate(ProcessModelTestRunnerDelegate): + def __init__( + self, + process_model_directory_path: str, + ) -> None: + super().__init__(process_model_directory_path) + self.bpmn_processes_to_file_mappings: dict[str, str] = {} + self.bpmn_files_to_called_element_mappings: dict[str, list[str]] = {} + self._discover_process_model_processes() + + def instantiate_executer(self, bpmn_file: str) -> BpmnWorkflow: + parser = MyCustomParser() + bpmn_file_etree = self._get_etree_from_bpmn_file(bpmn_file) + parser.add_bpmn_xml(bpmn_file_etree, filename=os.path.basename(bpmn_file)) + all_related = self._find_related_bpmn_files(bpmn_file) + for related_file in all_related: + related_file_etree = self._get_etree_from_bpmn_file(related_file) + parser.add_bpmn_xml(related_file_etree, filename=os.path.basename(related_file)) + sub_parsers = list(parser.process_parsers.values()) + executable_process = None + for sub_parser in sub_parsers: + if sub_parser.process_executable: + executable_process = sub_parser.bpmn_id + if executable_process is None: + raise BpmnFileMissingExecutableProcessError( + f"Executable process cannot be found in {bpmn_file}. Test cannot run." + ) + bpmn_process_spec = parser.get_spec(executable_process) + bpmn_process_instance = BpmnWorkflow(bpmn_process_spec) + return bpmn_process_instance + + def execute_task(self, spiff_task: SpiffTask, task_data_for_submit: Optional[dict] = None) -> None: + if task_data_for_submit is not None or spiff_task.task_spec.manual: + if task_data_for_submit is not None: + spiff_task.update_data(task_data_for_submit) + spiff_task.complete() + else: + spiff_task.run() + + def get_next_task(self, bpmn_process_instance: BpmnWorkflow) -> Optional[SpiffTask]: + ready_tasks = list([t for t in bpmn_process_instance.get_tasks(TaskState.READY)]) + if len(ready_tasks) > 0: + return ready_tasks[0] + return None + + def _get_etree_from_bpmn_file(self, bpmn_file: str) -> etree._Element: + data = None + with open(bpmn_file, "rb") as f_handle: + data = f_handle.read() + etree_xml_parser = etree.XMLParser(resolve_entities=False) + return etree.fromstring(data, parser=etree_xml_parser) + + def _find_related_bpmn_files(self, bpmn_file: str) -> list[str]: + related_bpmn_files = [] + if bpmn_file in self.bpmn_files_to_called_element_mappings: + for bpmn_process_identifier in self.bpmn_files_to_called_element_mappings[bpmn_file]: + if bpmn_process_identifier in self.bpmn_processes_to_file_mappings: + new_file = self.bpmn_processes_to_file_mappings[bpmn_process_identifier] + related_bpmn_files.append(new_file) + related_bpmn_files.extend(self._find_related_bpmn_files(new_file)) + return related_bpmn_files + + def _discover_process_model_processes( + self, + ) -> None: + process_model_bpmn_file_glob = os.path.join(self.process_model_directory_path, "**", "*.bpmn") + + for file in glob.glob(process_model_bpmn_file_glob, recursive=True): + file_norm = os.path.normpath(file) + if file_norm not in self.bpmn_files_to_called_element_mappings: + self.bpmn_files_to_called_element_mappings[file_norm] = [] + with open(file_norm, "rb") as f: + file_contents = f.read() + etree_xml_parser = etree.XMLParser(resolve_entities=False) + + # if we cannot load process model then ignore it since it can cause errors unrelated + # to the test and if it is related, it will most likely be caught further along the test + try: + root = etree.fromstring(file_contents, parser=etree_xml_parser) + except etree.XMLSyntaxError: + continue + + call_activities = root.findall(".//bpmn:callActivity", namespaces=DEFAULT_NSMAP) + for call_activity in call_activities: + if "calledElement" in call_activity.attrib: + called_element = call_activity.attrib["calledElement"] + self.bpmn_files_to_called_element_mappings[file_norm].append(called_element) + bpmn_process_element = root.find('.//bpmn:process[@isExecutable="true"]', namespaces=DEFAULT_NSMAP) + if bpmn_process_element is not None: + bpmn_process_identifier = bpmn_process_element.attrib["id"] + self.bpmn_processes_to_file_mappings[bpmn_process_identifier] = file_norm + + DEFAULT_NSMAP = { "bpmn": "http://www.omg.org/spec/BPMN/20100524/MODEL", "bpmndi": "http://www.omg.org/spec/BPMN/20100524/DI", @@ -93,18 +219,16 @@ JSON file format: class ProcessModelTestRunner: - """Generic test runner code. May move into own library at some point. + """Runs the test case json files for a given process model directory. - KEEP THIS GENERIC. do not add backend specific code here. + It searches for test case files recursively and will run all it finds by default. """ def __init__( self, process_model_directory_path: str, + process_model_test_runner_delegate_class: Type = ProcessModelTestRunnerMostlyPureSpiffDelegate, process_model_directory_for_test_discovery: Optional[str] = None, - instantiate_executer_callback: Optional[Callable[[str], Any]] = None, - execute_task_callback: Optional[Callable[[Any, Optional[str], Optional[dict]], Any]] = None, - get_next_task_callback: Optional[Callable[[Any], Any]] = None, test_case_file: Optional[str] = None, test_case_identifier: Optional[str] = None, ) -> None: @@ -112,21 +236,24 @@ class ProcessModelTestRunner: self.process_model_directory_for_test_discovery = ( process_model_directory_for_test_discovery or process_model_directory_path ) - self.instantiate_executer_callback = instantiate_executer_callback - self.execute_task_callback = execute_task_callback - self.get_next_task_callback = get_next_task_callback self.test_case_file = test_case_file self.test_case_identifier = test_case_identifier - # keep track of the current task data index - self.task_data_index: dict[str, int] = {} + if not issubclass(process_model_test_runner_delegate_class, ProcessModelTestRunnerDelegate): + raise UnsupporterRunnerDelegateGiven( + "Process model test runner delegate must inherit from ProcessModelTestRunnerDelegate. Given" + f" class '{process_model_test_runner_delegate_class}' does not" + ) - self.test_case_results: list[TestCaseResult] = [] - self.bpmn_processes_to_file_mappings: dict[str, str] = {} - self.bpmn_files_to_called_element_mappings: dict[str, list[str]] = {} + self.process_model_test_runner_delegate = process_model_test_runner_delegate_class( + process_model_directory_path + ) self.test_mappings = self._discover_process_model_test_cases() - self._discover_process_model_processes() + self.test_case_results: list[TestCaseResult] = [] + + # keep track of the current task data index + self.task_data_index: dict[str, int] = {} def all_test_cases_passed(self) -> bool: failed_tests = self.failing_tests() @@ -178,7 +305,9 @@ class ProcessModelTestRunner: test_case_task_properties = test_case_contents["tasks"][test_case_task_key] task_type = next_task.task_spec.__class__.__name__ - if task_type in ["ServiceTask", "UserTask", "CallActivity"] and test_case_task_properties is None: + if task_type in ["ServiceTask", "UserTask", "CallActivity"] and ( + test_case_task_properties is None or "data" not in test_case_task_properties + ): raise UnrunnableTestCaseError( f"Cannot run test case '{test_case_identifier}'. It requires task data for" f" {next_task.task_spec.bpmn_id} because it is of type '{task_type}'" @@ -207,138 +336,29 @@ class ProcessModelTestRunner: ] self._add_test_result(error_message is None, bpmn_file, test_case_identifier, error_message) - def _discover_process_model_test_cases( - self, - ) -> dict[str, str]: - test_mappings = {} - - json_test_file_glob = os.path.join(self.process_model_directory_for_test_discovery, "**", "test_*.json") - - for file in glob.glob(json_test_file_glob, recursive=True): - file_norm = os.path.normpath(file) - file_dir = os.path.dirname(file_norm) - json_file_name = os.path.basename(file_norm) - if self.test_case_file is None or json_file_name == self.test_case_file: - bpmn_file_name = re.sub(r"^test_(.*)\.json", r"\1.bpmn", json_file_name) - bpmn_file_path = os.path.join(file_dir, bpmn_file_name) - if os.path.isfile(bpmn_file_path): - test_mappings[file_norm] = bpmn_file_path - else: - raise MissingBpmnFileForTestCaseError( - f"Cannot find a matching bpmn file for test case json file: '{file_norm}'" - ) - return test_mappings - - def _discover_process_model_processes( - self, - ) -> None: - process_model_bpmn_file_glob = os.path.join(self.process_model_directory_path, "**", "*.bpmn") - - for file in glob.glob(process_model_bpmn_file_glob, recursive=True): - file_norm = os.path.normpath(file) - if file_norm not in self.bpmn_files_to_called_element_mappings: - self.bpmn_files_to_called_element_mappings[file_norm] = [] - with open(file_norm, "rb") as f: - file_contents = f.read() - etree_xml_parser = etree.XMLParser(resolve_entities=False) - - # if we cannot load process model then ignore it since it can cause errors unrelated - # to the test and if it is related, it will most likely be caught further along the test - try: - root = etree.fromstring(file_contents, parser=etree_xml_parser) - except etree.XMLSyntaxError: - continue - - call_activities = root.findall(".//bpmn:callActivity", namespaces=DEFAULT_NSMAP) - for call_activity in call_activities: - if "calledElement" in call_activity.attrib: - called_element = call_activity.attrib["calledElement"] - self.bpmn_files_to_called_element_mappings[file_norm].append(called_element) - bpmn_process_element = root.find('.//bpmn:process[@isExecutable="true"]', namespaces=DEFAULT_NSMAP) - if bpmn_process_element is not None: - bpmn_process_identifier = bpmn_process_element.attrib["id"] - self.bpmn_processes_to_file_mappings[bpmn_process_identifier] = file_norm - def _execute_task( self, spiff_task: SpiffTask, test_case_task_key: Optional[str], test_case_task_properties: Optional[dict] ) -> None: - if self.execute_task_callback: - self.execute_task_callback(spiff_task, test_case_task_key, test_case_task_properties) - self._default_execute_task(spiff_task, test_case_task_key, test_case_task_properties) + task_data_for_submit = None + if test_case_task_key and test_case_task_properties and "data" in test_case_task_properties: + if test_case_task_key not in self.task_data_index: + self.task_data_index[test_case_task_key] = 0 + task_data_length = len(test_case_task_properties["data"]) + test_case_index = self.task_data_index[test_case_task_key] + if task_data_length <= test_case_index: + raise MissingInputTaskData( + f"Missing input task data for task: {test_case_task_key}. " + f"Only {task_data_length} given in the json but task was called {test_case_index + 1} times" + ) + task_data_for_submit = test_case_task_properties["data"][test_case_index] + self.task_data_index[test_case_task_key] += 1 + self.process_model_test_runner_delegate.execute_task(spiff_task, task_data_for_submit) def _get_next_task(self, bpmn_process_instance: BpmnWorkflow) -> Optional[SpiffTask]: - if self.get_next_task_callback: - return self.get_next_task_callback(bpmn_process_instance) - return self._default_get_next_task(bpmn_process_instance) + return self.process_model_test_runner_delegate.get_next_task(bpmn_process_instance) def _instantiate_executer(self, bpmn_file: str) -> BpmnWorkflow: - if self.instantiate_executer_callback: - return self.instantiate_executer_callback(bpmn_file) - return self._default_instantiate_executer(bpmn_file) - - def _default_get_next_task(self, bpmn_process_instance: BpmnWorkflow) -> Optional[SpiffTask]: - ready_tasks = list([t for t in bpmn_process_instance.get_tasks(TaskState.READY)]) - if len(ready_tasks) > 0: - return ready_tasks[0] - return None - - def _default_execute_task( - self, spiff_task: SpiffTask, test_case_task_key: Optional[str], test_case_task_properties: Optional[dict] - ) -> None: - if spiff_task.task_spec.manual or spiff_task.task_spec.__class__.__name__ == "ServiceTask": - if test_case_task_key and test_case_task_properties and "data" in test_case_task_properties: - if test_case_task_key not in self.task_data_index: - self.task_data_index[test_case_task_key] = 0 - task_data_length = len(test_case_task_properties["data"]) - test_case_index = self.task_data_index[test_case_task_key] - if task_data_length <= test_case_index: - raise MissingInputTaskData( - f"Missing input task data for task: {test_case_task_key}. " - f"Only {task_data_length} given in the json but task was called {test_case_index + 1} times" - ) - spiff_task.update_data(test_case_task_properties["data"][test_case_index]) - self.task_data_index[test_case_task_key] += 1 - spiff_task.complete() - else: - spiff_task.run() - - def _find_related_bpmn_files(self, bpmn_file: str) -> list[str]: - related_bpmn_files = [] - if bpmn_file in self.bpmn_files_to_called_element_mappings: - for bpmn_process_identifier in self.bpmn_files_to_called_element_mappings[bpmn_file]: - if bpmn_process_identifier in self.bpmn_processes_to_file_mappings: - new_file = self.bpmn_processes_to_file_mappings[bpmn_process_identifier] - related_bpmn_files.append(new_file) - related_bpmn_files.extend(self._find_related_bpmn_files(new_file)) - return related_bpmn_files - - def _get_etree_from_bpmn_file(self, bpmn_file: str) -> etree._Element: - data = None - with open(bpmn_file, "rb") as f_handle: - data = f_handle.read() - etree_xml_parser = etree.XMLParser(resolve_entities=False) - return etree.fromstring(data, parser=etree_xml_parser) - - def _default_instantiate_executer(self, bpmn_file: str) -> BpmnWorkflow: - parser = MyCustomParser() - bpmn_file_etree = self._get_etree_from_bpmn_file(bpmn_file) - parser.add_bpmn_xml(bpmn_file_etree, filename=os.path.basename(bpmn_file)) - all_related = self._find_related_bpmn_files(bpmn_file) - for related_file in all_related: - related_file_etree = self._get_etree_from_bpmn_file(related_file) - parser.add_bpmn_xml(related_file_etree, filename=os.path.basename(related_file)) - sub_parsers = list(parser.process_parsers.values()) - executable_process = None - for sub_parser in sub_parsers: - if sub_parser.process_executable: - executable_process = sub_parser.bpmn_id - if executable_process is None: - raise BpmnFileMissingExecutableProcessError( - f"Executable process cannot be found in {bpmn_file}. Test cannot run." - ) - bpmn_process_spec = parser.get_spec(executable_process) - bpmn_process_instance = BpmnWorkflow(bpmn_process_spec) - return bpmn_process_instance + return self.process_model_test_runner_delegate.instantiate_executer(bpmn_file) def _get_relative_path_of_bpmn_file(self, bpmn_file: str) -> str: return os.path.relpath(bpmn_file, start=self.process_model_directory_path) @@ -382,8 +402,29 @@ class ProcessModelTestRunner: ) self.test_case_results.append(test_result) + def _discover_process_model_test_cases( + self, + ) -> dict[str, str]: + test_mappings = {} + json_test_file_glob = os.path.join(self.process_model_directory_for_test_discovery, "**", "test_*.json") -class BpmnFileMissingExecutableProcessError(Exception): + for file in glob.glob(json_test_file_glob, recursive=True): + file_norm = os.path.normpath(file) + file_dir = os.path.dirname(file_norm) + json_file_name = os.path.basename(file_norm) + if self.test_case_file is None or json_file_name == self.test_case_file: + bpmn_file_name = re.sub(r"^test_(.*)\.json", r"\1.bpmn", json_file_name) + bpmn_file_path = os.path.join(file_dir, bpmn_file_name) + if os.path.isfile(bpmn_file_path): + test_mappings[file_norm] = bpmn_file_path + else: + raise MissingBpmnFileForTestCaseError( + f"Cannot find a matching bpmn file for test case json file: '{file_norm}'" + ) + return test_mappings + + +class ProcessModeltTestRunnerBackendDelegate(ProcessModelTestRunnerMostlyPureSpiffDelegate): pass @@ -398,9 +439,7 @@ class ProcessModelTestRunnerService: process_model_directory_path, test_case_file=test_case_file, test_case_identifier=test_case_identifier, - # instantiate_executer_callback=self._instantiate_executer_callback, - # execute_task_callback=self._execute_task_callback, - # get_next_task_callback=self._get_next_task_callback, + process_model_test_runner_delegate_class=ProcessModeltTestRunnerBackendDelegate, ) def run(self) -> None: diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_model_test_runner.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_model_test_runner.py index 78e6b2e1..2984568d 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_model_test_runner.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_model_test_runner.py @@ -8,6 +8,7 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest from spiffworkflow_backend.services.process_model_test_runner_service import NoTestCasesFoundError from spiffworkflow_backend.services.process_model_test_runner_service import ProcessModelTestRunner +from spiffworkflow_backend.services.process_model_test_runner_service import UnsupporterRunnerDelegateGiven class TestProcessModelTestRunner(BaseTest): @@ -29,6 +30,16 @@ class TestProcessModelTestRunner(BaseTest): process_model_test_runner.run() assert process_model_test_runner.all_test_cases_passed(), process_model_test_runner.test_case_results + def test_will_raise_if_bad_delegate_is_given( + self, + app: Flask, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + with pytest.raises(UnsupporterRunnerDelegateGiven): + ProcessModelTestRunner( + os.path.join(self.root_path(), "DNE"), process_model_test_runner_delegate_class=NoTestCasesFoundError + ) + def test_can_test_multiple_process_models_with_all_passing_tests( self, app: Flask,